# coding: utf-8
# Copyright (C) 1994-2018 Altair Engineering, Inc.
# For more information, contact Altair at www.altair.com.
#
# This file is part of the PBS Professional ("PBS Pro") software.
#
# Open Source License Information:
#
# PBS Pro is free software. You can redistribute it and/or modify it under the
# terms of the GNU Affero General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# PBS Pro is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.
# See the GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
#
# Commercial License Information:
#
# For a copy of the commercial license terms and conditions,
# go to: (http://www.pbspro.com/UserArea/agreement.html)
# or contact the Altair Legal Department.
#
# Altair’s dual-license business model allows companies, individuals, and
# organizations to create proprietary derivative works of PBS Pro and
# distribute them - whether embedded or bundled with other software -
# under a commercial license agreement.
#
# Use of Altair’s trademarks, including but not limited to "PBS™",
# "PBS Professional®", and "PBS Pro™" and Altair’s logos is subject to Altair's
# trademark licensing policies.
import time
import os
import json
from tests.functional import *
from ptl.utils.pbs_snaputils import *
class TestPBSSnapshot(TestFunctional):
"""
Test suit with unit tests for the pbs_snapshot tool
"""
pbs_snapshot_path = None
snapdirs = []
snaptars = []
parent_dir = os.getcwd()
def setUp(self):
TestFunctional.setUp(self)
# Create a custom resource called 'ngpus'
# This will help us test parts of PBSSnapUtils which handle resources
attr = {"type": "long", "flag": "nh"}
self.server.manager(MGR_CMD_CREATE, RSC, attr,
id="ngpus", expect=True,
sudo=True)
# Check whether pbs_snapshot is accessible
try:
self.pbs_snapshot_path = os.path.join(
self.server.pbs_conf["PBS_EXEC"], "sbin", "pbs_snapshot")
ret = self.du.run_cmd(cmd=[self.pbs_snapshot_path, "-h"])
if ret['rc'] != 0:
self.pbs_snapshot_path = None
except Exception:
self.pbs_snapshot_path = None
# Check whether the user has root access or not
# pbs_snapshot only supports being run as root, so skip the entire
# testsuite if the user doesn't have root privileges
ret = self.du.run_cmd(
cmd=["ls", os.path.join(os.sep, "root")], sudo=True)
if ret['rc'] != 0:
self.skipTest("pbs_snapshot/PBSSnapUtils need root privileges")
def setup_sc(self, sched_id, partition, port,
sched_priv=None, sched_log=None):
"""
Setup a scheduler
:param sched_id: id of the scheduler
:type sched_id: str
:param partition: partition name for the scheduler (e.g "P1", "P1,P2")
:type partition: str
:param port: The port number string for the scheduler
:type port: str
:param sched_priv: 'sched_priv' (full path) for the scheduler
:type sched_priv: str
:param sched_log: 'sched_log' (full path) for the scheduler
:type sched_log: str
:param log_filter: log filter value for the scheduler
:type log_filter: int
"""
a = {'partition': partition,
'sched_host': self.server.hostname,
'sched_port': port}
if sched_priv is not None:
a['sched_priv'] = sched_priv
if sched_log is not None:
a['sched_log'] = sched_log
self.server.manager(MGR_CMD_CREATE, SCHED, a, id=sched_id)
if 'sched_priv' in a:
sched_dir = os.path.dirname(sched_priv)
self.scheds[sched_id].create_scheduler(sched_dir)
self.scheds[sched_id].start(sched_dir)
else:
self.scheds[sched_id].create_scheduler()
self.scheds[sched_id].start()
self.server.manager(MGR_CMD_SET, SCHED,
{'scheduling': 'True'}, id=sched_id, expect=True)
def setup_queues_nodes(self, num_partitions):
"""
Given a no. of partitions, create equal no. of associated queues
and nodes
:param num_partitions: number of partitions
:type num_partitions: int
:return a tuple of lists of queue and node ids:
([q1, q1, ..], [n1, n2, ..])
"""
queues = []
nodes = []
a_q = {"queue_type": "execution",
"started": "True",
"enabled": "True"}
a_n = {"resources_available.ncpus": 2}
self.server.create_vnodes("vnode", a_n, (num_partitions + 1),
self.mom)
for i in range(num_partitions):
partition_id = "P" + str(i + 1)
# Create queue i + 1 with partition i + 1
id_q = "wq" + str(i + 1)
queues.append(id_q)
a_q["partition"] = partition_id
self.server.manager(MGR_CMD_CREATE, QUEUE, a_q, id=id_q)
# Set the partition i + 1 on node i
id_n = "vnode[" + str(i) + "]"
nodes.append(id_n)
a = {"partition": partition_id}
self.server.manager(MGR_CMD_SET, NODE, a, id=id_n, expect=True)
return (queues, nodes)
def take_snapshot(self, acct_logs=None, daemon_logs=None,
obfuscate=None, with_sudo=True, hosts=None):
"""
Take a snapshot using pbs_snapshot command
:param acct_logs: Number of accounting logs to capture
:type acct_logs: int
:param daemon_logs: Number of daemon logs to capture
:type daemon_logs: int
:param obfuscate: Obfuscate information?
:type obfuscate: bool
:param with_sudo: use the --with-sudo option?
:type with_sudo: bool
:param hosts: list of additional hosts to capture information from
:type list
:return a tuple of name of tarball and snapshot directory captured:
(tarfile, snapdir)
"""
if self.pbs_snapshot_path is None:
self.skip_test("pbs_snapshot not found")
snap_cmd = [self.pbs_snapshot_path, "-o", self.parent_dir]
if acct_logs is not None:
snap_cmd.append("--accounting-logs=" + str(acct_logs))
if daemon_logs is not None:
snap_cmd.append("--daemon-logs=" + str(daemon_logs))
if obfuscate:
snap_cmd.append("--obfuscate")
if with_sudo:
snap_cmd.append("--with-sudo")
if hosts is not None:
hosts_str = ",".join(hosts)
snap_cmd.append("--additional-hosts=" + hosts_str)
ret = self.du.run_cmd(cmd=snap_cmd, logerr=False)
self.assertEquals(ret['rc'], 0)
# Get the name of the tarball that was created
# pbs_snapshot prints to stdout only the following:
# "Snapshot available at: "
self.assertTrue(len(ret['out']) > 0)
snap_out = ret['out'][0]
output_tar = snap_out.split(":")[1]
output_tar = output_tar.strip()
# Check that the output tarball was created
self.assertTrue(os.path.isfile(output_tar),
"%s not found" % (output_tar))
# Unwrap the tarball
tar = tarfile.open(output_tar)
tar.extractall(path=self.parent_dir)
tar.close()
# snapshot directory name = .tgz[:-4]
snap_dir = output_tar[:-4]
# Check that the directory exists
self.assertTrue(os.path.isdir(snap_dir))
self.snapdirs.append(snap_dir)
self.snaptars.append(output_tar)
return (output_tar, snap_dir)
def test_capture_server(self):
"""
Test the 'capture_server' interface of PBSSnapUtils
"""
# Set something on the server so we can match it later
job_hist_duration = "12:00:00"
attr_list = {"job_history_enable": "True",
"job_history_duration": job_hist_duration}
self.server.manager(MGR_CMD_SET, SERVER, attr_list)
num_daemon_logs = 2
num_acct_logs = 5
with PBSSnapUtils(out_dir=self.parent_dir, acct_logs=num_acct_logs,
daemon_logs=num_daemon_logs,
with_sudo=True) as snap_obj:
snap_dir = snap_obj.capture_server(True, True)
# Go through the snapshot and perform certain checks
# Check 1: the snapshot exists
self.assertTrue(os.path.isdir(snap_dir))
# Check 2: all directories except the 'server' directory have no
# files
svr_fullpath = os.path.join(snap_dir, "server")
for root, _, files in os.walk(snap_dir):
for filename in files:
file_fullpath = os.path.join(root, filename)
# Find the common paths between 'server' & the file
common_path = os.path.commonprefix([file_fullpath,
svr_fullpath])
self.assertEquals(os.path.basename(common_path), "server")
# Check 3: qstat_Bf.out exists
qstat_bf_out = os.path.join(snap_obj.snapdir, QSTAT_BF_PATH)
self.assertTrue(os.path.isfile(qstat_bf_out))
# Check 4: qstat_Bf.out has 'job_history_duration' set to 24:00:00
with open(qstat_bf_out, "r") as fd:
for line in fd:
if "job_history_duration" in line:
# Remove whitespaces
line = "".join(line.split())
# Split it up by '='
key_val = line.split("=")
self.assertEquals(key_val[1], job_hist_duration)
# Cleanup
if os.path.isdir(snap_dir):
self.du.rm(path=snap_dir, recursive=True, force=True)
def test_capture_all(self):
"""
Test the 'capture_all' interface of PBSSnapUtils
WARNING: Assumes that the test is being run on type - 1 PBS install
"""
num_daemon_logs = 2
num_acct_logs = 5
# Check that all PBS daemons are up and running
all_daemons_up = self.server.isUp()
all_daemons_up = all_daemons_up and self.mom.isUp()
all_daemons_up = all_daemons_up and self.comm.isUp()
all_daemons_up = all_daemons_up and self.scheduler.isUp()
if not all_daemons_up:
# Skip the test
self.skipTest("Type 1 installation not present or " +
"all daemons are not running")
with PBSSnapUtils(out_dir=self.parent_dir, acct_logs=num_acct_logs,
daemon_logs=num_daemon_logs,
with_sudo=True) as snap_obj:
snap_dir = snap_obj.capture_all()
snap_obj.finalize()
# Test that all the expected information has been captured
# PBSSnapUtils has various dictionaries which store metadata
# for various objects. Create a list of these dicts
all_info = [snap_obj.server_info, snap_obj.job_info,
snap_obj.node_info, snap_obj.comm_info,
snap_obj.hook_info, snap_obj.sched_info,
snap_obj.resv_info, snap_obj.core_info,
snap_obj.sys_info]
skip_list = [ACCT_LOGS, QMGR_LPBSHOOK_OUT, "reservation", "job",
QMGR_PR_OUT, PG_LOGS, "core_file_bt",
"pbs_snapshot.log"]
platform = self.du.get_platform()
if not platform.startswith("linux"):
skip_list.extend([ETC_HOSTS, ETC_NSSWITCH_CONF, LSOF_PBS_OUT,
VMSTAT_OUT, DF_H_OUT, DMESG_OUT])
for item_info in all_info:
for key, info in item_info.iteritems():
info_path = info[0]
if info_path is None:
continue
# Check if we should skip checking this info
skip_item = False
for item in skip_list:
if isinstance(item, int):
if item == key:
skip_item = True
break
else:
if item in info_path:
skip_item = True
break
if skip_item:
continue
# Check if this information was captured
info_full_path = os.path.join(snap_dir, info_path)
self.assertTrue(os.path.exists(info_full_path),
msg=info_full_path + " was not captured")
# Cleanup
if os.path.isdir(snap_dir):
self.du.rm(path=snap_dir, recursive=True, force=True)
def test_capture_pbs_logs(self):
"""
Test the 'capture_pbs_logs' interface of PBSSnapUtils
"""
num_daemon_logs = 2
num_acct_logs = 5
# Check which PBS daemons are up on this machine.
# We'll only check for logs from the daemons which were up
# when the snapshot was taken.
server_up = self.server.isUp()
mom_up = self.mom.isUp()
comm_up = self.comm.isUp()
sched_up = self.scheduler.isUp()
if not (server_up or mom_up or comm_up or sched_up):
# Skip the test
self.skipTest("No PBSPro daemons found on the system," +
" skipping the test")
with PBSSnapUtils(out_dir=self.parent_dir, acct_logs=num_acct_logs,
daemon_logs=num_daemon_logs,
with_sudo=True) as snap_obj:
snap_dir = snap_obj.capture_pbs_logs()
# Perform some checks
# Check that the snapshot exists
self.assertTrue(os.path.isdir(snap_dir))
if server_up:
# Check that 'server_logs' were captured
log_path = os.path.join(snap_dir, SVR_LOGS_PATH)
self.assertTrue(os.path.isdir(log_path))
# Check that 'accounting_logs' were captured
log_path = os.path.join(snap_dir, ACCT_LOGS_PATH)
self.assertTrue(os.path.isdir(log_path))
if mom_up:
# Check that 'mom_logs' were captured
log_path = os.path.join(snap_dir, MOM_LOGS_PATH)
self.assertTrue(os.path.isdir(log_path))
if comm_up:
# Check that 'comm_logs' were captured
log_path = os.path.join(snap_dir, COMM_LOGS_PATH)
self.assertTrue(os.path.isdir(log_path))
if sched_up:
# Check that 'sched_logs' were captured
log_path = os.path.join(snap_dir, DFLT_SCHED_LOGS_PATH)
self.assertTrue(os.path.isdir(log_path))
if os.path.isdir(snap_dir):
self.du.rm(path=snap_dir, recursive=True, force=True)
def test_snapshot_basic(self):
"""
Test capturing a snapshot via the pbs_snapshot program
"""
if self.pbs_snapshot_path is None:
self.skip_test("pbs_snapshot not found")
output_tar, _ = self.take_snapshot()
# Check that the output tarball was created
self.assertTrue(os.path.isfile(output_tar))
def test_snapshot_without_logs(self):
"""
Test capturing a snapshot via the pbs_snapshot program
Capture no logs
"""
if self.pbs_snapshot_path is None:
self.skip_test("pbs_snapshot not found")
(_, snap_dir) = self.take_snapshot(0, 0)
# Check that 'server_logs' were not captured
log_path = os.path.join(snap_dir, SVR_LOGS_PATH)
self.assertTrue(not os.path.isdir(log_path))
# Check that 'mom_logs' were not captured
log_path = os.path.join(snap_dir, MOM_LOGS_PATH)
self.assertTrue(not os.path.isdir(log_path))
# Check that 'comm_logs' were not captured
log_path = os.path.join(snap_dir, COMM_LOGS_PATH)
self.assertTrue(not os.path.isdir(log_path))
# Check that 'sched_logs' were not captured
log_path = os.path.join(snap_dir, DFLT_SCHED_LOGS_PATH)
self.assertTrue(not os.path.isdir(log_path))
# Check that 'accounting_logs' were not captured
log_path = os.path.join(snap_dir, ACCT_LOGS_PATH)
self.assertTrue(not os.path.isdir(log_path))
def test_obfuscate_resv_user_groups(self):
"""
Test obfuscation of user & group related attributes while capturing
snapshots via pbs_snapshot
"""
if self.pbs_snapshot_path is None:
self.skip_test("pbs_snapshot not found")
now = int(time.time())
# Let's submit a reservation with Authorized_Users and
# Authorized_Groups set
attribs = {ATTR_auth_u: TEST_USER1, ATTR_auth_g: TSTGRP0,
ATTR_l + ".ncpus": 2, 'reserve_start': now + 25,
'reserve_end': now + 45}
resv_obj = Reservation(attrs=attribs)
resv_id = self.server.submit(resv_obj)
attribs = {'reserve_state': (MATCH_RE, 'RESV_CONFIRMED|2')}
self.server.expect(RESV, attribs, id=resv_id)
# Now, take a snapshot with --obfuscate
(_, snap_dir) = self.take_snapshot(0, 0, True)
# Make sure that the pbs_rstat -f output captured doesn't have the
# Authorized user and group names
pbsrstat_path = os.path.join(snap_dir, PBS_RSTAT_F_PATH)
self.assertTrue(os.path.isfile(pbsrstat_path))
with open(pbsrstat_path, "r") as rstatfd:
all_content = rstatfd.read()
self.assertFalse(str(TEST_USER1) in all_content)
self.assertFalse(str(TSTGRP0) in all_content)
def test_multisched_support(self):
"""
Test that pbs_snapshot can capture details of all schedulers
"""
if self.pbs_snapshot_path is None:
self.skip_test("pbs_snapshot not found")
# Setup 3 schedulers
sched_ids = ["sc1", "sc2", "sc3", "default"]
self.setup_sc(sched_ids[0], "P1", "15050")
self.setup_sc(sched_ids[1], "P2", "15051")
# Setup scheduler at non-default location
dir_path = os.path.join(os.sep, 'var', 'spool', 'pbs', 'sched_dir')
if not os.path.exists(dir_path):
self.du.mkdir(path=dir_path, sudo=True)
sched_priv = os.path.join(dir_path, 'sched_priv_sc3')
sched_log = os.path.join(dir_path, 'sched_logs_sc3')
self.setup_sc(sched_ids[2], "P3", "15052", sched_priv, sched_log)
# Add 3 partitions, each associated with a queue and a node
(q_ids, _) = self.setup_queues_nodes(3)
# Submit some jobs to fill the system up and get the multiple
# schedulers busy
for q_id in q_ids:
for _ in range(2):
attr = {"queue": q_id, "Resource_List.ncpus": "1"}
j = Job(TEST_USER1, attrs=attr)
self.server.submit(j)
# Capture a snapshot of the system with multiple schedulers
(_, snapdir) = self.take_snapshot()
# Check that sched priv and sched logs for all schedulers was captured
for sched_id in sched_ids:
if (sched_id == "default"):
schedi_priv = os.path.join(snapdir, DFLT_SCHED_PRIV_PATH)
schedi_logs = os.path.join(snapdir, DFLT_SCHED_LOGS_PATH)
else:
schedi_priv = os.path.join(snapdir, "sched_priv_" + sched_id)
schedi_logs = os.path.join(snapdir, "sched_logs_" + sched_id)
self.assertTrue(os.path.isdir(schedi_priv))
self.assertTrue(os.path.isdir(schedi_logs))
# Make sure that these directories are not empty
self.assertTrue(len(os.listdir(schedi_priv)) > 0)
self.assertTrue(len(os.listdir(schedi_logs)) > 0)
# Check that qmgr -c "l sched" captured information about all scheds
lschedpath = os.path.join(snapdir, QMGR_LSCHED_PATH)
with open(lschedpath, "r") as fd:
scheds_found = 0
for line in fd:
if line.startswith("Sched "):
sched_id = line.split("Sched ")[1]
sched_id = sched_id.strip()
self.assertTrue(sched_id in sched_ids)
scheds_found += 1
self.assertEqual(scheds_found, 4)
def test_snapshot_from_hook(self):
"""
Test that pbs_snapshot can be called from inside a hook
"""
logmsg = "pbs_snapshot was successfully run"
hook_body = """
import pbs
import os
import subprocess
import time
pbs_snap_exec = os.path.join(pbs.pbs_conf['PBS_EXEC'], "sbin", "pbs_snapshot")
if not os.path.isfile(pbs_snap_exec):
raise ValueError("pbs_snapshot executable not found")
ref_time = time.time()
snap_cmd = [pbs_snap_exec, "-o", "."]
assert(not subprocess.call(snap_cmd))
# Check that the snapshot was captured
snapshot_found = False
for filename in os.listdir("."):
if filename.startswith("snapshot") and filename.endswith(".tgz"):
# Make sure the mtime on this file is recent enough
mtime_file = os.path.getmtime(filename)
if mtime_file > ref_time:
snapshot_found = True
break
assert(snapshot_found)
pbs.logmsg(pbs.EVENT_DEBUG,"%s")
""" % (logmsg)
hook_name = "snapshothook"
attr = {"event": "periodic", "freq": 5}
rv = self.server.create_import_hook(hook_name, attr, hook_body,
overwrite=True)
self.assertTrue(rv)
self.server.log_match(logmsg)
def snapshot_multi_mom_basic(self, obfuscate=False):
"""
Test capturing data from a multi-mom system
:param obfuscate: take snapshot with --obfuscate?
:type obfuscate: bool
"""
# Skip test if number of moms is not equal to two
if len(self.moms) != 2:
self.skipTest("test requires atleast two moms as input, "
"use -p moms=:")
mom1 = self.moms.values()[0]
mom2 = self.moms.values()[1]
host1 = mom1.shortname
host2 = mom2.shortname
self.server.manager(MGR_CMD_DELETE, NODE, None, "")
self.server.manager(MGR_CMD_CREATE, NODE, id=host1)
self.server.manager(MGR_CMD_CREATE, NODE, id=host2)
# Give the moms a chance to contact the server.
self.server.expect(NODE, {'state': 'free'}, id=host1)
self.server.expect(NODE, {'state': 'free'}, id=host2)
# Capture a snapshot with details from the remote moms
(_, snapdir) = self.take_snapshot(hosts=[host1, host2],
obfuscate=obfuscate)
# Check that snapshots for the 2 hosts were captured
host1_outtar = os.path.join(snapdir, host1 + "_snapshot.tgz")
host2_outtar = os.path.join(snapdir, host2 + "_snapshot.tgz")
self.assertTrue(os.path.isfile(host1_outtar),
"Failed to capture snapshot on %s" % (host1))
self.assertTrue(os.path.isfile(host2_outtar),
"Failed to capture snapshot on %s" % (host2))
# Unwrap the host snapshots
host1_snapdir = host1 + "_snapshot"
host2_snapdir = host2 + "_snapshot"
os.mkdir(host1_snapdir)
self.snapdirs.append(host1_snapdir)
os.mkdir(host2_snapdir)
self.snapdirs.append(host2_snapdir)
tar = tarfile.open(host1_outtar)
tar.extractall(path=host1_snapdir)
tar.close()
tar = tarfile.open(host2_outtar)
tar.extractall(path=host2_snapdir)
tar.close()
# Determine the name of the child snapshots
snap1_path = self.du.listdir(path=host1_snapdir, fullpath=True)
snap2_path = self.du.listdir(path=host2_snapdir, fullpath=True)
snap1_path = snap1_path[0]
snap2_path = snap2_path[0]
# Check that at least pbs.conf was captured on all of these hosts
self.assertTrue(os.path.isfile(os.path.join(snapdir, "pbs.conf")),
"Main snapshot didn't capture all expected"
" information")
self.assertTrue(os.path.isfile(os.path.join(snap1_path, "pbs.conf")),
"%s snapshot didn't capture all expected"
" information" % (host1))
self.assertTrue(os.path.isfile(os.path.join(snap2_path, "pbs.conf")),
"%s snapshot didn't capture all expected"
" information" % (host2))
def test_multi_mom_basic(self):
"""
Test running pbs_snapshot on a multi-mom setup
"""
self.snapshot_multi_mom_basic()
def test_multi_mom_basic_obfuscate(self):
"""
Test running pbs_snapshot on a multi-mom setup with obfuscation
"""
self.snapshot_multi_mom_basic(obfuscate=True)
def test_no_sudo(self):
"""
Test that running pbs_snapshot without sudo doesn't fail
"""
output_tar, _ = self.take_snapshot(with_sudo=False)
# Check that the output tarball was created
self.assertTrue(os.path.isfile(output_tar))
def test_snapshot_json(self):
"""
Test that pbs_snapshot captures job and vnode info in json
"""
_, snap_dir = self.take_snapshot()
# Verify that qstat json was captured
jsonpath = os.path.join(snap_dir, QSTAT_F_JSON_PATH)
self.assertTrue(os.path.isfile(jsonpath))
with open(jsonpath, "r") as fd:
json.load(fd) # this will fail if file is not a valid json
# Verify that pbsnodes json was captured
jsonpath = os.path.join(snap_dir, PBSNODES_AVFJSON_PATH)
self.assertTrue(os.path.isfile(jsonpath))
with open(jsonpath, "r") as fd:
json.load(fd)
@classmethod
def tearDownClass(self):
# Delete the snapshot directories and tarballs created
for snap_dir in self.snapdirs:
self.du.rm(path=snap_dir, recursive=True, force=True)
for snap_tar in self.snaptars:
self.du.rm(path=snap_tar, sudo=True, force=True)
TestFunctional.tearDownClass()