# coding: utf-8
# Copyright (C) 1994-2018 Altair Engineering, Inc.
# For more information, contact Altair at www.altair.com.
#
# This file is part of the PBS Professional ("PBS Pro") software.
#
# Open Source License Information:
#
# PBS Pro is free software. You can redistribute it and/or modify it under the
# terms of the GNU Affero General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# PBS Pro is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.
# See the GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
#
# Commercial License Information:
#
# For a copy of the commercial license terms and conditions,
# go to: (http://www.pbspro.com/UserArea/agreement.html)
# or contact the Altair Legal Department.
#
# Altair’s dual-license business model allows companies, individuals, and
# organizations to create proprietary derivative works of PBS Pro and
# distribute them - whether embedded or bundled with other software -
# under a commercial license agreement.
#
# Use of Altair’s trademarks, including but not limited to "PBS™",
# "PBS Professional®", and "PBS Pro™" and Altair’s logos is subject to Altair's
# trademark licensing policies.
import re
import time
import sys
import datetime
import logging
import traceback
import math
from subprocess import Popen, PIPE
from ptl.utils.pbs_dshutils import DshUtils
from ptl.lib.pbs_testlib import BatchUtils, Server, NODE, JOB, SET, EQ
from ptl.lib.pbs_testlib import ResourceResv
from ptl.utils.pbs_fileutils import FileUtils, FILE_TAIL
"""
Analyze ``server``, ``scheduler``, ``MoM``, and ``accounting`` logs.
- Scheduler log analysis:
Extraction of per cycle information including:
cycle start time
cycle duration
time to query objects from server
number of jobs considered
number of jobs run
number of jobs that failed to run
Number of jobs preempted
Number of jobs that failed to preempt
Number of jobs calendared
time to determine that a job can run
time to determine that a job can not run
time spent calendaring
time spent in scheduler solver
Summary of all cycles information
- Server log analysis:
job submit rate
number of jobs ended
number of jobs run
job run rate
job submit rate
job end rate
job wait time distribution
PBS versions
node up rate
wait time
- Mom log analysis:
job submit rate
number of jobs ended
number of jobs run
job run rate
job submit rate
job end rate
PBS versions
- Accounting log analysis:
job submit rate
number of jobs ended
number of jobs run
job run rate
job submit rate
job end rate
job size (cpu and node) distribution
job wait time distribution
utilization
"""
tm_re = r'(?P\d\d/\d\d/\d{4}\s\d\d:\d\d:\d\d(\.\d{6})?)'
job_re = r';(?P[\d\[\d*\]]+)\.'
fail_re = r';(?P[\d\[\[]+)\.'
# Server metrics
NUR = 'node_up_rate'
# Scheduler metrics
NC = 'num_cycles'
mCD = 'cycle_duration_min'
MCD = 'cycle_duration_max'
mCT = 'min_cycle_time'
MCT = 'max_cycle_time'
CDA = 'cycle_duration_mean'
CD25 = 'cycle_duration_25p'
CDA = 'cycle_duration_mean'
CD50 = 'cycle_duration_median'
CD75 = 'cycle_duration_75p'
CST = 'cycle_start_time'
CD = 'cycle_duration'
QD = 'query_duration'
NJC = 'num_jobs_considered'
NJFR = 'num_jobs_failed_to_run'
SST = 'scheduler_solver_time'
NJCAL = 'num_jobs_calendared'
NJFP = 'num_jobs_failed_to_preempt'
NJP = 'num_jobs_preempted'
T2R = 'time_to_run'
T2D = 'time_to_discard'
TiS = 'time_in_sched'
TTC = 'time_to_calendar'
# Scheduling Estimated Start Time
EST = 'estimates'
EJ = 'estimated_jobs'
Eat = 'estimated'
DDm = 'drift_duration_min'
DDM = 'drift_duration_max'
DDA = 'drift_duration_mean'
DD50 = 'drift_duration_median'
ND = 'num_drifts'
NJD = 'num_jobs_drifted'
NJND = 'num_jobs_no_drift'
NEST = 'num_estimates'
JDD = 'job_drift_duration'
ESTR = 'estimated_start_time_range'
ESTA = 'estimated_start_time_accuracy'
JST = 'job_start_time'
ESTS = 'estimated_start_time_summary'
Ds15mn = 'drifted_sub_15mn'
Ds1hr = 'drifted_sub_1hr'
Ds3hr = 'drifted_sub_3hr'
Do3hr = 'drifted_over_3hr'
# Accounting metrics
JWTm = 'job_wait_time_min'
JWTM = 'job_wait_time_max'
JWTA = 'job_wait_time_mean'
JWT25 = 'job_wait_time_25p'
JWT50 = 'job_wait_time_median'
JWT75 = 'job_wait_time_75p'
JRTm = 'job_run_time_min'
JRT25 = 'job_run_time_25p'
JRT50 = 'job_run_time_median'
JRTA = 'job_run_time_mean'
JRT75 = 'job_run_time_75p'
JRTM = 'job_run_time_max'
JNSm = 'job_node_size_min'
JNS25 = 'job_node_size_25p'
JNS50 = 'job_node_size_median'
JNSA = 'job_node_size_mean'
JNS75 = 'job_node_size_75p'
JNSM = 'job_node_size_max'
JCSm = 'job_cpu_size_min'
JCS25 = 'job_cpu_size_25p'
JCS50 = 'job_cpu_size_median'
JCSA = 'job_cpu_size_mean'
JCS75 = 'job_cpu_size_75p'
JCSM = 'job_cpu_size_max'
CPH = 'cpu_hours'
NPH = 'node_hours'
USRS = 'unique_users'
UNCPUS = 'utilization_ncpus'
UNODES = 'utilization_nodes'
# Generic metrics
VER = 'pbs_version'
JID = 'job_id'
JRR = 'job_run_rate'
JSR = 'job_submit_rate'
JER = 'job_end_rate'
NJQ = 'num_jobs_queued'
NJR = 'num_jobs_run'
NJE = 'num_jobs_ended'
DUR = 'duration'
RI = 'custom_interval'
IT = 'init_time'
CF = 'custom_freq'
CFC = 'custom_freq_counts'
CG = 'custom_groups'
PARSER_OK_CONTINUE = 0
PARSER_OK_STOP = 1
PARSER_ERROR_CONTINUE = 2
PARSER_ERROR_STOP = 3
epoch_datetime = datetime.datetime.fromtimestamp(0)
class PBSLogUtils(object):
"""
Miscellaneous utilities to process log files
"""
logger = logging.getLogger(__name__)
du = DshUtils()
@classmethod
def convert_date_time(cls, dt=None, fmt=None):
"""
convert a date time string of the form given by fmt into
number of seconds since epoch (with possible microseconds)
:param dt: the datetime string to convert
:type dt: str or None
:param fmt: Format to which datetime is to be converted
:type fmt: str
:returns: None if conversion fails
"""
if dt is None:
return None
micro = False
if fmt is None:
if '.' in dt:
micro = True
fmt = "%m/%d/%Y %H:%M:%S.%f"
else:
fmt = "%m/%d/%Y %H:%M:%S"
try:
# Get datetime object
t = datetime.datetime.strptime(dt, fmt)
# Get timedelta object of epoch time
t -= epoch_datetime
# get epoch time from timedelta object
tm = t.total_seconds()
except:
cls.logger.debug("could not convert date time: " + str(datetime))
return None
if micro is True:
return tm
else:
return int(tm)
def get_num_lines(self, log, hostname=None, sudo=False):
"""
Get the number of lines of particular log
:param log: the log file name
:type log: str
"""
f = self.open_log(log, hostname, sudo=sudo)
nl = sum([1 for _ in f])
f.close()
return nl
def open_log(self, log, hostname=None, sudo=False):
"""
:param log: the log file name to read from
:type log: str
:param hostname: the hostname from which to read the file
:type hostname: str or None
:param sudo: Whether to access log file as a privileged user.
:type sudo: boolean
:returns: A file instance
"""
try:
if hostname is None or self.du.is_localhost(hostname):
if sudo:
cmd = ['sudo', 'cat', log]
self.logger.info('running ' + " ".join(cmd))
p = Popen(cmd, stdout=PIPE)
f = p.stdout
else:
f = open(log)
else:
cmd = ['ssh', hostname]
if sudo:
cmd += ['sudo']
cmd += ['cat', log]
self.logger.debug('running ' + " ".join(cmd))
p = Popen(cmd, stdout=PIPE)
f = p.stdout
except:
traceback.print_exc()
self.logger.error('Problem processing file ' + log)
f = None
return f
def get_timestamps(self, logfile=None, hostname=None, num=None,
sudo=False):
"""
Helper function to parse logfile
:returns: Each timestamp in a list as number of seconds since epoch
"""
if logfile is None:
return
records = self.open_log(logfile, hostname, sudo=sudo)
if records is None:
return
rec_times = []
tm_tag = re.compile(tm_re)
num_rec = 0
for record in records:
num_rec += 1
if num is not None and num_rec > num:
break
m = tm_tag.match(record)
if m:
rec_times.append(
self.convert_date_time(m.group('datetime')))
records.close()
return rec_times
def match_msg(self, lines, msg, allmatch=False, regexp=False,
starttime=None, endtime=None):
"""
Returns (x,y) where x is the matching line y, or None if
nothing is found.
:param allmatch: If True (False by default), return a list
of matching tuples.
:type allmatch: boolean
:param regexp: If True, msg is a Python regular expression.
Defaults to False.
:type regexp: bool
:param starttime: If set ignore matches that occur before
specified time
:param endtime: If set ignore matches that occur after
specified time
"""
linecount = 0
ret = []
if lines:
for l in lines:
if starttime is not None:
# l[:19] captures the log record time
tm = self.convert_date_time(l[:19])
if tm is None or tm < starttime:
continue
if endtime is not None:
# l[:19] captures the log record time
tm = self.convert_date_time(l[:19])
if tm is None or tm > endtime:
continue
if ((regexp and re.search(msg, l)) or
(not regexp and l.find(msg) != -1)):
m = (linecount, l)
if allmatch:
ret.append(m)
else:
return m
linecount += 1
if len(ret) > 0:
return ret
return None
@staticmethod
def convert_resv_date_time(date_time):
"""
Convert reservation datetime to seconds
"""
try:
t = time.strptime(date_time, "%a %b %d %H:%M:%S %Y")
except:
t = time.localtime()
return int(time.mktime(t))
@staticmethod
def convert_hhmmss_time(tm):
"""
Convert datetime in hhmmss format to seconds
"""
if ':' not in tm:
return tm
hms = tm.split(':')
return int(int(hms[0]) * 3600 + int(hms[1]) * 60 + int(hms[2]))
def get_rate(self, l=[]):
"""
:returns: The frequency of occurrences of array l
The array is expected to be sorted
"""
if len(l) > 0:
duration = l[len(l) - 1] - l[0]
if duration > 0:
tm_factor = [1, 60, 60, 24]
_rate = float(len(l)) / float(duration)
index = 0
while _rate < 1 and index < len(tm_factor):
index += 1
_rate *= tm_factor[index]
_rate = "%.2f" % (_rate)
if index == 0:
_rate = str(_rate) + '/s'
elif index == 1:
_rate = str(_rate) + '/mn'
elif index == 2:
_rate = str(_rate) + '/hr'
else:
_rate = str(_rate) + '/day'
else:
_rate = str(len(l)) + '/s'
return _rate
return 0
def in_range(self, tm, start=None, end=None):
"""
:param tm: time to check within a provided range
:param start: Lower limit for the time range
:param end: Higer limit for the time range
:returns: True if time is in the range else return False
"""
if start is None and end is None:
return True
if start is None and end is not None:
if tm <= end:
return True
else:
return False
if start is not None and end is None:
if tm >= start:
return True
else:
return False
else:
if tm >= start and tm <= end:
return True
return False
@staticmethod
def _duration(val=None):
if val is not None:
return str(datetime.timedelta(seconds=int(val)))
@staticmethod
def get_day(tm=None):
"""
:param tm: Time for which to get a day
"""
if tm is None:
tm = time.time()
return time.strftime("%Y%m%d", time.localtime(tm))
@staticmethod
def percentile(N, percent):
"""
Find the percentile of a list of values.
:param N: A list of values. Note N MUST BE already sorted.
:type N: List
:param percent: A float value from 0.0 to 1.0.
:type percent: Float
:returns: The percentile of the values
"""
if not N:
return None
k = (len(N) - 1) * percent
f = math.floor(k)
c = math.ceil(k)
if f == c:
return N[int(k)]
d0 = N[int(f)] * (c - k)
d1 = N[int(c)] * (k - f)
return d0 + d1
@staticmethod
def process_intervals(intervals, groups, frequency=60):
"""
Process the intervals
"""
info = {}
if not intervals:
return info
val = [x - intervals[i - 1] for i, x in enumerate(intervals) if i > 0]
info[RI] = ", ".join(map(lambda v: str(v), val))
if intervals:
info[IT] = intervals[0]
if frequency is not None:
_cf = []
j = 0
i = 1
while i < len(intervals):
if (intervals[i] - intervals[j]) > frequency:
_cf.append(((intervals[j], intervals[i - 1]), i - j))
j = i
i += 1
if i != j + 1:
_cf.append(((intervals[j], intervals[i - 1]), i - j))
else:
_cf.append(((intervals[j], intervals[j]), 1))
info[CFC] = _cf
info[CF] = frequency
if groups:
info[CG] = groups
return info
def get_log_files(self, hostname, path, start, end, sudo=False):
"""
:param hostname: Hostname of the machine
:type hostname: str
:param path: Path for the log file
:type path: str
:param start: Start time for the log file
:param end: End time for the log file
:returns: list of log file(s) found or an empty list
"""
paths = []
if self.du.isdir(hostname, path, sudo=sudo):
logs = self.du.listdir(hostname, path, sudo=sudo)
for f in sorted(logs):
if start is not None or end is not None:
tm = self.get_timestamps(f, hostname, num=1, sudo=sudo)
if not tm:
continue
d1 = time.strftime("%Y%m%d", time.localtime(tm[0]))
if start is not None:
d2 = time.strftime("%Y%m%d", time.localtime(start))
if d1 < d2:
continue
if end is not None:
d2 = time.strftime("%Y%m%d", time.localtime(end))
if d1 > d2:
continue
paths.append(f)
elif self.du.isfile(hostname, path, sudo=sudo):
paths = [path]
return paths
class PBSLogAnalyzer(object):
"""
Utility to analyze the PBS logs
"""
logger = logging.getLogger(__name__)
logutils = PBSLogUtils()
generic_tag = re.compile(tm_re + ".*")
node_type_tag = re.compile(tm_re + ".*" + "Type 58 request.*")
queue_type_tag = re.compile(tm_re + ".*" + "Type 20 request.*")
job_type_tag = re.compile(tm_re + ".*" + "Type 51 request.*")
job_exit_tag = re.compile(tm_re + ".*" + job_re + ";Exit_status.*")
def __init__(self, schedlog=None, serverlog=None,
momlog=None, acctlog=None, genericlog=None,
hostname=None, show_progress=False):
self.hostname = hostname
self.schedlog = schedlog
self.serverlog = serverlog
self.acctlog = acctlog
self.momlog = momlog
self.genericlog = genericlog
self.show_progress = show_progress
self._custom_tag = None
self._custom_freq = None
self._custom_id = False
self._re_interval = []
self._re_group = {}
self.num_conditional_matches = 0
self.re_conditional = None
self.num_conditionals = 0
self.prev_records = []
self.info = {}
self.scheduler = None
self.server = None
self.mom = None
self.accounting = None
if schedlog:
self.scheduler = PBSSchedulerLog(schedlog, hostname, show_progress)
if serverlog:
self.server = PBSServerLog(serverlog, hostname, show_progress)
if momlog:
self.mom = PBSMoMLog(momlog, hostname, show_progress)
if acctlog:
self.accounting = PBSAccountingLog(acctlog, hostname,
show_progress)
def set_custom_match(self, pattern, frequency=None):
"""
Set the custome matching
:param pattern: Matching pattern
:type pattern: str
:param frequency: Frequency of match
:type frequency: int
"""
self._custom_tag = re.compile(tm_re + ".*" + pattern + ".*")
self._custom_freq = frequency
def set_conditional_match(self, conditions):
"""
Set the conditional match
:param conditions: Conditions for macthing
"""
if not isinstance(conditions, list):
return False
self.re_conditional = conditions
self.num_conditionals = len(conditions)
self.prev_records = map(lambda n: '', range(self.num_conditionals))
self.info['matches'] = []
def analyze_scheduler_log(self, filename=None, start=None, end=None,
hostname=None, summarize=True):
"""
Analyze the scheduler log
:param filename: Scheduler log file name
:type filename: str or None
:param start: Time from which log to be analyzed
:param end: Time till which log to be analyzed
:param hostname: Hostname of the machine
:type hostname: str or None
:param summarize: Summarize data parsed if True else not
:type summarize: bool
"""
if self.scheduler is None:
self.scheduler = PBSSchedulerLog(filename, hostname=hostname)
return self.scheduler.analyze(filename, start, end, hostname,
summarize)
def analyze_server_log(self, filename=None, start=None, end=None,
hostname=None, summarize=True):
"""
Analyze the server log
"""
if self.server is None:
self.server = PBSServerLog(filename, hostname=hostname)
return self.server.analyze(filename, start, end, hostname,
summarize)
def analyze_accounting_log(self, filename=None, start=None, end=None,
hostname=None, summarize=True):
"""
Analyze the accounting log
"""
if self.accounting is None:
self.accounting = PBSAccountingLog(filename, hostname=hostname)
return self.accounting.analyze(filename, start, end, hostname,
summarize=summarize, sudo=True)
def analyze_mom_log(self, filename=None, start=None, end=None,
hostname=None, summarize=True):
"""
Analyze the mom log
"""
if self.mom is None:
self.mom = PBSMoMLog(filename, hostname=hostname)
return self.mom.analyze(filename, start, end, hostname, summarize)
def parse_conditional(self, rec, start, end):
"""
Match a sequence of regular expressions against multiple
consecutive lines in a generic log. Calculate the number
of conditional matching lines.
Example usage: to find the number of times the scheduler
stat'ing the server causes the scheduler to miss jobs ending,
which could possibly indicate a race condition between the
view of resources assigned to nodes and the actual jobs
running, one would call this function by setting
re_conditional to
``['Type 20 request received from Scheduler', 'Exit_status']``
Which can be read as counting the number of times that the
Type 20 message is preceded by an ``Exit_status`` message
"""
match = True
for rc in range(self.num_conditionals):
if not re.search(self.re_conditional[rc], self.prev_records[rc]):
match = False
if match:
self.num_conditional_matches += 1
self.info['matches'].extend(self.prev_records)
for i in range(self.num_conditionals - 1, -1, -1):
self.prev_records[i] = self.prev_records[i - 1]
self.prev_records[0] = rec
return PARSER_OK_CONTINUE
def parse_custom_tag(self, rec, start, end):
m = self._custom_tag.match(rec)
if m:
tm = self.logutils.convert_date_time(m.group('datetime'))
if ((start is None and end is None) or
self.logutils.in_range(tm, start, end)):
self._re_interval.append(tm)
for k, v in m.groupdict().items():
if k in self._re_group:
self._re_group[k].append(v)
else:
self._re_group[k] = [v]
elif end is not None and tm > end:
return PARSER_OK_STOP
return PARSER_OK_CONTINUE
def parse_block(self, rec, start, end):
m = self.generic_tag.match(rec)
if m:
tm = self.logutils.convert_date_time(m.group('datetime'))
if ((start is None and end is None) or
self.logutils.in_range(tm, start, end)):
print rec,
def comp_analyze(self, rec, start, end):
if self.re_conditional is not None:
return self.parse_conditional(rec, start, end)
elif self._custom_tag is not None:
return self.parse_custom_tag(rec, start, end)
elif start is not None or end is not None:
return self.parse_block(rec, start, end)
def analyze(self, path=None, start=None, end=None, hostname=None,
summarize=True, sudo=False):
"""
Parse any log file. This method is not ``context-specific``
to each log file type.
:param path: name of ``file/dir`` to parse
:type path: str or None
:param start: optional record time at which to start analyzing
:param end: optional record time after which to stop analyzing
:param hostname: name of host on which to operate. Defaults to
localhost
:type hostname: str or None
:param summarize: if True, summarize data parsed. Defaults to
True.
:type summarize: bool
:param sudo: If True, access log file(s) as privileged user.
:type sudo: bool
"""
if hostname is None and self.hostname is not None:
hostname = self.hostname
for f in self.logutils.get_log_files(hostname, path, start, end,
sudo=sudo):
self._log_parser(f, start, end, hostname, sudo=sudo)
if summarize:
return self.summary()
def _log_parser(self, filename, start, end, hostname=None, sudo=False):
if filename is not None:
records = self.logutils.open_log(filename, hostname, sudo=sudo)
else:
return None
if records is None:
return None
num_records = self.logutils.get_num_lines(filename, hostname,
sudo=sudo)
num_line = 0
last_rec = None
if self.show_progress:
perc_range = range(10, 110, 10)
perc_records = map(lambda x: num_records * x / 100, perc_range)
sys.stderr.write('Parsing ' + filename + ': |0%')
sys.stderr.flush()
for rec in records:
num_line += 1
if self.show_progress and (num_line > perc_records[0]):
sys.stderr.write('-' + str(perc_range[0]) + '%')
sys.stderr.flush()
perc_range.remove(perc_range[0])
perc_records.remove(perc_records[0])
last_rec = rec
rv = self.comp_analyze(rec, start, end)
if (rv in (PARSER_OK_STOP, PARSER_ERROR_STOP) or
(self.show_progress and len(perc_records) == 0)):
break
if self.show_progress:
sys.stderr.write('-100%|\n')
sys.stderr.flush()
records.close()
if last_rec is not None:
self.epilogue(last_rec)
def analyze_logs(self, schedlog=None, serverlog=None, momlog=None,
acctlog=None, genericlog=None, start=None, end=None,
hostname=None, showjob=False):
"""
Analyze logs
"""
if hostname is None and self.hostname is not None:
hostname = self.hostname
if schedlog is None and self.schedlog is not None:
schedlog = self.schedlog
if serverlog is None and self.serverlog is not None:
serverlog = self.serverlog
if momlog is None and self.momlog is not None:
momlog = self.momlog
if acctlog is None and self.acctlog is not None:
acctlog = self.acctlog
if genericlog is None and self.genericlog is not None:
genericlog = self.genericlog
cycles = None
sjr = {}
if schedlog:
self.analyze_scheduler_log(schedlog, start, end, hostname,
summarize=False)
cycles = self.scheduler.cycles
if serverlog:
self.analyze_server_log(serverlog, start, end, hostname,
summarize=False)
sjr = self.server.server_job_run
if momlog:
self.analyze_mom_log(momlog, start, end, hostname,
summarize=False)
if acctlog:
self.analyze_accounting_log(acctlog, start, end, hostname,
summarize=False)
if genericlog:
self.analyze(genericlog, start, end, hostname, sudo=True,
summarize=False)
if cycles is not None and len(sjr.keys()) != 0:
for cycle in cycles:
for jid, tm in cycle.sched_job_run.items():
# skip job arrays: scheduler runs a subjob
# but we don't keep track of which Considering job to run
# message it is associated with because the consider
# message doesn't show the subjob
if '[' in jid:
continue
if jid in sjr:
for tm in sjr[jid]:
if tm > cycle.start and tm < cycle.end:
cycle.inschedduration[jid] = \
tm - cycle.consider[jid]
return self.summary(showjob)
def epilogue(self, line):
pass
def summary(self, showjob=False, writer=None):
info = {}
if self._custom_tag is not None:
self.info = self.logutils.process_intervals(self._re_interval,
self._re_group,
self._custom_freq)
return self.info
if self.re_conditional is not None:
self.info['num_conditional_matches'] = self.num_conditional_matches
return self.info
if self.scheduler is not None:
info['scheduler'] = self.scheduler.summary(self.scheduler.cycles,
showjob)
if self.server is not None:
info['server'] = self.server.summary()
if self.accounting is not None:
info['accounting'] = self.accounting.summary()
if self.mom is not None:
info['mom'] = self.mom.summary()
return info
class PBSServerLog(PBSLogAnalyzer):
"""
:param filename: Server log filename
:type filename: str or None
:param hostname: Hostname of the machine
:type hostname: str or None
"""
tm_tag = re.compile(tm_re)
server_run_tag = re.compile(tm_re + ".*" + job_re + ".*;Job Run at.*")
server_nodeup_tag = re.compile(tm_re + ".*Node;.*;node up.*")
server_enquejob_tag = re.compile(tm_re + ".*" + job_re +
".*enqueuing into.*state 1 .*")
server_endjob_tag = re.compile(tm_re + ".*" + job_re +
".*;Exit_status.*")
def __init__(self, filename=None, hostname=None, show_progress=False):
self.server_job_queued = {}
self.server_job_run = {}
self.server_job_end = {}
self.records = None
self.nodeup = []
self.enquejob = []
self.record_tm = []
self.jobsrun = []
self.jobsend = []
self.wait_time = []
self.run_time = []
self.hostname = hostname
self.info = {}
self.version = []
self.filename = filename
self.show_progress = show_progress
def parse_runjob(self, line):
"""
Parse server log for run job records.
For each record keep track of the job id, and time in a
dedicated array
"""
m = self.server_run_tag.match(line)
if m:
tm = self.logutils.convert_date_time(m.group('datetime'))
self.jobsrun.append(tm)
jobid = str(m.group('jobid'))
if jobid in self.server_job_run:
self.server_job_run[jobid].append(tm)
else:
self.server_job_run[jobid] = [tm]
if jobid in self.server_job_queued:
self.wait_time.append(tm - self.server_job_queued[jobid])
def parse_endjob(self, line):
"""
Parse server log for run job records.
For each record keep track of the job id, and time in a
dedicated array
"""
m = self.server_endjob_tag.match(line)
if m:
tm = self.logutils.convert_date_time(m.group('datetime'))
self.jobsend.append(tm)
jobid = str(m.group('jobid'))
if jobid in self.server_job_end:
self.server_job_end[jobid].append(tm)
else:
self.server_job_end[jobid] = [tm]
if jobid in self.server_job_run:
self.run_time.append(tm - self.server_job_run[jobid][-1:][0])
def parse_nodeup(self, line):
"""
Parse server log for nodes that are up
"""
m = self.server_nodeup_tag.match(line)
if m:
tm = self.logutils.convert_date_time(m.group('datetime'))
self.nodeup.append(tm)
def parse_enquejob(self, line):
"""
Parse server log for enqued jobs
"""
m = self.server_enquejob_tag.match(line)
if m:
tm = self.logutils.convert_date_time(m.group('datetime'))
self.enquejob.append(tm)
jobid = str(m.group('jobid'))
self.server_job_queued[jobid] = tm
def comp_analyze(self, rec, start=None, end=None):
m = self.tm_tag.match(rec)
if m:
tm = self.logutils.convert_date_time(m.group('datetime'))
self.record_tm.append(tm)
if not self.logutils.in_range(tm, start, end):
if end and tm > end:
return PARSER_OK_STOP
return PARSER_OK_CONTINUE
if 'pbs_version=' in rec:
version = rec.split('pbs_version=')[1].strip()
if version not in self.version:
self.version.append(version)
self.parse_enquejob(rec)
self.parse_nodeup(rec)
self.parse_runjob(rec)
self.parse_endjob(rec)
return PARSER_OK_CONTINUE
def summary(self):
self.info[JSR] = self.logutils.get_rate(self.enquejob)
self.info[NJE] = len(self.server_job_end.keys())
self.info[NJQ] = len(self.enquejob)
self.info[NUR] = self.logutils.get_rate(self.nodeup)
self.info[JRR] = self.logutils.get_rate(self.jobsrun)
self.info[JER] = self.logutils.get_rate(self.jobsend)
if len(self.wait_time) > 0:
wt = sorted(self.wait_time)
wta = float(sum(self.wait_time)) / len(self.wait_time)
self.info[JWTm] = self.logutils._duration(min(wt))
self.info[JWTM] = self.logutils._duration(max(wt))
self.info[JWTA] = self.logutils._duration(wta)
self.info[JWT25] = self.logutils._duration(
self.logutils.percentile(wt, .25))
self.info[JWT50] = self.logutils._duration(
self.logutils.percentile(wt, .5))
self.info[JWT75] = self.logutils._duration(
self.logutils.percentile(wt, .75))
njr = 0
for v in self.server_job_run.values():
njr += len(v)
self.info[NJR] = njr
self.info[VER] = ",".join(self.version)
if len(self.run_time) > 0:
rt = sorted(self.run_time)
self.info[JRTm] = self.logutils._duration(min(rt))
self.info[JRT25] = self.logutils._duration(
self.logutils.percentile(rt, 0.25))
self.info[JRT50] = self.logutils._duration(
self.logutils.percentile(rt, 0.50))
self.info[JRTA] = self.logutils._duration(
str(sum(rt) / len(rt)))
self.info[JRT75] = self.logutils._duration(
self.logutils.percentile(rt, 0.75))
self.info[JRTM] = self.logutils._duration(max(rt))
return self.info
class JobEstimatedStartTimeInfo(object):
"""
Information regarding Job estimated start time
"""
def __init__(self, jobid):
self.jobid = jobid
self.started_at = None
self.estimated_at = []
self.num_drifts = 0
self.num_estimates = 0
self.drift_time = 0
def add_estimate(self, tm):
"""
Add a job's new estimated start time
If the new estimate is now later than any preivous one, we
add that difference to the drift time. If the new drift time
is pulled earlier it is not added to the drift time.
drift time is a measure of ``"negative perception"`` that
comes along a job being estimated to run at a later date than
earlier ``"advertised"``.
"""
if self.estimated_at:
prev_tm = self.estimated_at[len(self.estimated_at) - 1]
if tm > prev_tm:
self.num_drifts += 1
self.drift_time += tm - prev_tm
self.estimated_at.append(tm)
self.num_estimates += 1
def __repr__(self):
estimated_at_str = map(lambda t: str(t), self.estimated_at)
return " ".join([str(self.jobid), 'started: ', str(self.started_at),
'estimated: ', ",".join(estimated_at_str)])
def __str__(self):
return self.__repr__()
class PBSSchedulerLog(PBSLogAnalyzer):
tm_tag = re.compile(tm_re)
startcycle_tag = re.compile(tm_re + ".*Starting Scheduling.*")
endcycle_tag = re.compile(tm_re + ".*Leaving [(the )]*[sS]cheduling.*")
alarm_tag = re.compile(tm_re + ".*alarm.*")
considering_job_tag = re.compile(tm_re + ".*" + job_re +
".*;Considering job to run.*")
sched_job_run_tag = re.compile(tm_re + ".*" + job_re + ".*;Job run.*")
estimated_tag = re.compile(tm_re + ".*" + job_re +
".*;Job is a top job and will run at "
"(?P.*)")
run_failure_tag = re.compile(tm_re + ".*" + fail_re + ".*;Failed to run.*")
calendarjob_tag = re.compile(
tm_re +
".*" +
job_re +
".*;Job is a top job.*")
preempt_failure_tag = re.compile(tm_re + ".*;Job failed to be preempted.*")
preempt_tag = re.compile(tm_re + ".*" + job_re + ".*;Job preempted.*")
record_tag = re.compile(tm_re + ".*")
def __init__(self, filename=None, hostname=None, show_progress=False):
self.filename = filename
self.hostname = hostname
self.show_progress = show_progress
self.record_tm = []
self.version = []
self.cycle = None
self.cycles = []
self.estimated_jobs = {}
self.estimated_parsing_enabled = False
self.parse_estimated_only = False
self.info = {}
self.summary_info = {}
def _parse_line(self, line):
"""
Parse scheduling cycle Starting, Leaving, and alarm records
From each record, keep track of the record time in a
dedicated array
"""
m = self.startcycle_tag.match(line)
if m:
tm = self.logutils.convert_date_time(m.group('datetime'))
# if cycle was interrupted assume previous cycle ended now
if self.cycle is not None and self.cycle.end == -1:
self.cycle.end = tm
self.cycle = PBSCycleInfo()
self.cycles.append(self.cycle)
self.cycle.start = tm
self.cycle.end = -1
return PARSER_OK_CONTINUE
m = self.endcycle_tag.match(line)
if m is not None and self.cycle is not None:
tm = self.logutils.convert_date_time(m.group('datetime'))
self.cycle.end = tm
self.cycle.duration = tm - self.cycle.start
if (self.cycle.lastjob is not None and
self.cycle.lastjob not in self.cycle.sched_job_run and
self.cycle.lastjob not in self.cycle.calendared_jobs):
self.cycle.cantrunduration[self.cycle.lastjob] = (
tm - self.cycle.consider[self.cycle.lastjob])
return PARSER_OK_CONTINUE
m = self.alarm_tag.match(line)
if m is not None and self.cycle is not None:
tm = self.logutils.convert_date_time(m.group('datetime'))
self.cycle.end = tm
return PARSER_OK_CONTINUE
m = self.considering_job_tag.match(line)
if m is not None and self.cycle is not None:
self.cycle.num_considered += 1
jid = str(m.group('jobid'))
tm = self.logutils.convert_date_time(m.group('datetime'))
self.cycle.consider[jid] = tm
self.cycle.political_order.append(jid)
if (self.cycle.lastjob is not None and
self.cycle.lastjob not in self.cycle.sched_job_run and
self.cycle.lastjob not in self.cycle.calendared_jobs):
self.cycle.cantrunduration[self.cycle.lastjob] = (
tm - self.cycle.consider[self.cycle.lastjob])
self.cycle.lastjob = jid
if self.cycle.queryduration == 0:
self.cycle.queryduration = tm - self.cycle.start
return PARSER_OK_CONTINUE
m = self.sched_job_run_tag.match(line)
if m is not None and self.cycle is not None:
jid = str(m.group('jobid'))
tm = self.logutils.convert_date_time(m.group('datetime'))
self.cycle.sched_job_run[jid] = tm
# job arrays require special handling because the considering
# job to run message does not have the subjob index but only []
if '[' in jid:
subjid = jid
if subjid not in self.cycle.consider:
jid = jid.split('[')[0] + '[]'
self.cycle.consider[subjid] = self.cycle.consider[jid]
self.cycle.runduration[subjid] = tm - self.cycle.consider[jid]
# job rerun due to preemption failure aren't considered, skip
elif jid in self.cycle.consider:
self.cycle.runduration[jid] = tm - self.cycle.consider[jid]
return PARSER_OK_CONTINUE
m = self.run_failure_tag.match(line)
if m is not None:
if self.cycle is not None:
jid = str(m.group('jobid'))
tm = self.logutils.convert_date_time(m.group('datetime'))
self.cycle.run_failure[jid] = tm
return PARSER_OK_CONTINUE
m = self.preempt_failure_tag.match(line)
if m is not None:
if self.cycle is not None:
self.cycle.num_preempt_failure += 1
return PARSER_OK_CONTINUE
m = self.preempt_tag.match(line)
if m is not None:
if self.cycle is not None:
jid = str(m.group('jobid'))
if self.cycle.lastjob in self.cycle.preempted_jobs:
self.cycle.preempted_jobs[self.cycle.lastjob].append(jid)
else:
self.cycle.preempted_jobs[self.cycle.lastjob] = [jid]
self.cycle.num_preempted += 1
return PARSER_OK_CONTINUE
m = self.calendarjob_tag.match(line)
if m is not None:
if self.cycle is not None:
jid = str(m.group('jobid'))
tm = self.logutils.convert_date_time(m.group('datetime'))
self.cycle.calendared_jobs[jid] = tm
if jid in self.cycle.consider:
self.cycle.calendarduration[jid] = \
(tm - self.cycle.consider[jid])
elif '[' in jid:
arrjid = re.sub("(\[\d+\])", '[]', jid)
if arrjid in self.cycle.consider:
self.cycle.consider[jid] = self.cycle.consider[arrjid]
self.cycle.calendarduration[jid] = \
(tm - self.cycle.consider[arrjid])
return PARSER_OK_CONTINUE
def get_cycles(self, start=None, end=None):
"""
Get the scheduler cycles
:param start: Start time
:param end: End time
:returns: Scheduling cycles
"""
if start is None and end is None:
return self.cycles
cycles = []
if end is None:
end = int(time.time())
for c in self.cycles:
if c.start >= start and c.end < end:
cycles.append(c)
return cycles
def comp_analyze(self, rec, start, end):
if self.estimated_parsing_enabled:
rv = self.estimated_info_parsing(rec)
if self.parse_estimated_only:
return rv
return self.scheduler_parsing(rec, start, end)
def scheduler_parsing(self, rec, start, end):
m = self.tm_tag.match(rec)
if m:
tm = self.logutils.convert_date_time(m.group('datetime'))
self.record_tm.append(tm)
if self.logutils.in_range(tm, start, end):
rv = self._parse_line(rec)
if rv in (PARSER_OK_STOP, PARSER_ERROR_STOP):
return rv
if 'pbs_version=' in rec:
version = rec.split('pbs_version=')[1].strip()
if version not in self.version:
self.version.append(version)
elif end is not None and tm > end:
PARSER_OK_STOP
return PARSER_OK_CONTINUE
def estimated_info_parsing(self, line):
"""
Parse Estimated start time information for a job
"""
m = self.sched_job_run_tag.match(line)
if m is not None:
jid = str(m.group('jobid'))
tm = self.logutils.convert_date_time(m.group('datetime'))
if jid in self.estimated_jobs:
self.estimated_jobs[jid].started_at = tm
else:
ej = JobEstimatedStartTimeInfo(jid)
ej.started_at = tm
self.estimated_jobs[jid] = ej
m = self.estimated_tag.match(line)
if m is not None:
jid = str(m.group('jobid'))
try:
tm = self.logutils.convert_date_time(m.group('est_tm'),
"%a %b %d %H:%M:%S %Y")
except:
logging.error('error converting time: ' +
str(m.group('est_tm')))
return PARSER_ERROR_STOP
if jid in self.estimated_jobs:
self.estimated_jobs[jid].add_estimate(tm)
else:
ej = JobEstimatedStartTimeInfo(jid)
ej.add_estimate(tm)
self.estimated_jobs[jid] = ej
return PARSER_OK_CONTINUE
def epilogue(self, line):
# if log ends in the middle of a cycle there is no 'Leaving cycle'
# message, in this case the last cycle duration is computed as
# from start to the last record in the log file
if self.cycle is not None and self.cycle.end <= 0:
m = self.record_tag.match(line)
if m:
self.cycle.end = self.logutils.convert_date_time(
m.group('datetime'))
def summarize_estimated_analysis(self, estimated_jobs=None):
"""
Summarize estimated job analysis
"""
if estimated_jobs is None and self.estimated_jobs is not None:
estimated_jobs = self.estimated_jobs
einfo = {EJ: []}
sub15mn = 0
sub1hr = 0
sub3hr = 0
sup3hr = 0
total_drifters = 0
total_nondrifters = 0
drift_times = []
for e in estimated_jobs.values():
info = {}
if len(e.estimated_at) > 0:
info[JID] = e.jobid
e_sorted = sorted(e.estimated_at)
info[Eat] = e.estimated_at
if e.started_at is not None:
info[JST] = e.started_at
e_diff = e_sorted[len(e_sorted) - 1] - e_sorted[0]
e_accuracy = (e.started_at -
e.estimated_at[len(e.estimated_at) - 1])
info[ESTR] = e_diff
info[ESTA] = e_accuracy
info[NEST] = e.num_estimates
info[ND] = e.num_drifts
info[JDD] = e.drift_time
drift_times.append(e.drift_time)
if e.drift_time > 0:
total_drifters += 1
if e.drift_time < 15 * 60:
sub15mn += 1
elif e.drift_time < 3600:
sub1hr += 1
elif e.drift_time < 3 * 3600:
sub3hr += 1
else:
sup3hr += 1
else:
total_nondrifters += 1
einfo[EJ].append(info)
info = {}
info[Ds15mn] = sub15mn
info[Ds1hr] = sub1hr
info[Ds3hr] = sub3hr
info[Do3hr] = sup3hr
info[NJD] = total_drifters
info[NJND] = total_nondrifters
if drift_times:
info[DDm] = min(drift_times)
info[DDM] = max(drift_times)
info[DDA] = (sum(drift_times) / len(drift_times))
info[DD50] = sorted(drift_times)[len(drift_times) / 2]
einfo[ESTS] = info
return einfo
def summary(self, cycles=None, showjobs=False):
"""
Scheduler log summary
"""
if self.estimated_parsing_enabled:
self.info[EST] = self.summarize_estimated_analysis()
if self.parse_estimated_only:
return self.info
if cycles is None and self.cycles is not None:
cycles = self.cycles
num_cycle = 0
run = 0
failed = 0
total_considered = 0
run_tm = []
cycle_duration = []
min_duration = None
max_duration = None
mint = maxt = None
calendarduration = 0
schedsolvertime = 0
for c in cycles:
c.summary(showjobs)
self.info[num_cycle] = c.info
run += len(c.sched_job_run.keys())
run_tm.extend(c.sched_job_run.values())
failed += len(c.run_failure.keys())
total_considered += c.num_considered
if max_duration is None or c.duration > max_duration:
max_duration = c.duration
maxt = time.strftime("%Y-%m-%d %H:%M:%S",
time.localtime(c.start))
if min_duration is None or c.duration < min_duration:
min_duration = c.duration
mint = time.strftime("%Y-%m-%d %H:%M:%S",
time.localtime(c.start))
cycle_duration.append(c.duration)
num_cycle += 1
calendarduration += sum(c.calendarduration.values())
schedsolvertime += c.scheduler_solver_time
run_rate = self.logutils.get_rate(sorted(run_tm))
sorted_cd = sorted(cycle_duration)
self.summary_info[NC] = len(cycles)
self.summary_info[NJR] = run
self.summary_info[NJFR] = failed
self.summary_info[JRR] = run_rate
self.summary_info[NJC] = total_considered
self.summary_info[mCD] = self.logutils._duration(min_duration)
self.summary_info[MCD] = self.logutils._duration(max_duration)
self.summary_info[CD25] = self.logutils._duration(
self.logutils.percentile(sorted_cd, .25))
if len(sorted_cd) > 0:
self.summary_info[CDA] = self.logutils._duration(
sum(sorted_cd) / len(sorted_cd))
self.summary_info[CD50] = self.logutils._duration(
self.logutils.percentile(sorted_cd, .5))
self.summary_info[CD75] = self.logutils._duration(
self.logutils.percentile(sorted_cd, .75))
if mint is not None:
self.summary_info[mCT] = mint
if maxt is not None:
self.summary_info[MCT] = maxt
self.summary_info[DUR] = self.logutils._duration(sum(cycle_duration))
self.summary_info[TTC] = self.logutils._duration(calendarduration)
self.summary_info[SST] = self.logutils._duration(schedsolvertime)
self.summary_info[VER] = ",".join(self.version)
self.info['summary'] = dict(self.summary_info.items())
return self.info
class PBSCycleInfo(object):
def __init__(self):
self.info = {}
"""
Time between end and start of a cycle, which may be on alarm,
or signal, not only Leaving - Starting
"""
self.duration = 0
" Time of a Starting scheduling cycle message "
self.start = 0
" Time of a Leaving scheduling cycle message "
self.end = 0
" Time at which Considering job to run message "
self.consider = {}
" Number of jobs considered "
self.num_considered = 0
" Time at which job run message in scheduler. This includes time to "
" start the job by the server "
self.sched_job_run = {}
"""
number of jobs added to the calendar, i.e.,
number of backfilling jobs
"""
self.calendared_jobs = {}
" Time between Considering job to run to Job run message "
self.runduration = {}
" Time to determine that job couldn't run "
self.cantrunduration = {}
" List of jobs preempted in order to run high priority job"
self.preempted_jobs = {}
"""
Time between considering job to run to server logging
'Job Run at request...
"""
self.inschedduration = {}
" Total time spent in scheduler solver, insched + cantrun + calendar"
self.scheduler_solver_time = 0
" Error 15XXX in the sched log corresponds to a failure to run"
self.run_failure = {}
" Job failed to be preempted"
self.num_preempt_failure = 0
" Job preempted by "
self.num_preempted = 0
" Time between start of cycle and first job considered to run "
self.queryduration = 0
" The order in which jobs are considered "
self.political_order = []
" Time to calendar "
self.calendarduration = {}
self.lastjob = None
def summary(self, showjobs=False):
"""
Summary regarding cycle
"""
self.info[CST] = time.strftime(
"%Y-%m-%d %H:%M:%S", time.localtime(self.start))
self.info[CD] = PBSLogUtils._duration(self.end - self.start)
self.info[QD] = PBSLogUtils._duration(self.queryduration)
# number of jobs considered may be different than length of
# the consider dictionary due to job arrays being considered once
# per subjob using the parent array job id
self.info[NJC] = self.num_considered
self.info[NJR] = len(self.sched_job_run.keys())
self.info[NJFR] = len(self.run_failure)
self.scheduler_solver_time = (sum(self.inschedduration.values()) +
sum(self.cantrunduration.values()) +
sum(self.calendarduration.values()))
self.info[SST] = self.scheduler_solver_time
self.info[NJCAL] = len(self.calendared_jobs.keys())
self.info[NJFP] = self.num_preempt_failure
self.info[NJP] = self.num_preempted
self.info[TTC] = sum(self.calendarduration.values())
if showjobs:
for j in self.consider.keys():
s = {JID: j}
if j in self.runduration:
s[T2R] = self.runduration[j]
if j in self.cantrunduration:
s[T2D] = self.cantrunduration[j]
if j in self.inschedduration:
s[TiS] = self.inschedduration[j]
if j in self.calendarduration:
s[TTC] = self.calendarduration[j]
if 'jobs' in self.info:
self.info['jobs'].append(s)
else:
self.info['jobs'] = [s]
class PBSMoMLog(PBSLogAnalyzer):
"""
Container and Parser of a PBS ``MoM`` log
"""
tm_tag = re.compile(tm_re)
mom_run_tag = re.compile(tm_re + ".*" + job_re + ".*;Started, pid.*")
mom_end_tag = re.compile(tm_re + ".*" + job_re +
".*;delete job request received.*")
mom_enquejob_tag = re.compile(tm_re + ".*;Type 5 .*")
def __init__(self, filename=None, hostname=None, show_progress=False):
self.filename = filename
self.hostname = hostname
self.show_progress = show_progress
self.start = []
self.end = []
self.queued = []
self.info = {}
self.version = []
def comp_analyze(self, rec, start, end):
m = self.mom_run_tag.match(rec)
if m:
tm = self.logutils.convert_date_time(m.group('datetime'))
if ((start is None and end is None) or
self.logutils.in_range(tm, start, end)):
self.start.append(tm)
return PARSER_OK_CONTINUE
elif end is not None and tm > end:
return PARSER_OK_STOP
m = self.mom_end_tag.match(rec)
if m:
tm = self.logutils.convert_date_time(m.group('datetime'))
if ((start is None and end is None) or
self.logutils.in_range(tm, start, end)):
self.end.append(tm)
return PARSER_OK_CONTINUE
elif end is not None and tm > end:
return PARSER_OK_STOP
m = self.mom_enquejob_tag.match(rec)
if m:
tm = self.logutils.convert_date_time(m.group('datetime'))
if ((start is None and end is None) or
self.logutils.in_range(tm, start, end)):
self.queued.append(tm)
return PARSER_OK_CONTINUE
elif end is not None and tm > end:
return PARSER_OK_STOP
if 'pbs_version=' in rec:
version = rec.split('pbs_version=')[1].strip()
if version not in self.version:
self.version.append(version)
return PARSER_OK_CONTINUE
def summary(self):
"""
Mom log summary
"""
run_rate = self.logutils.get_rate(self.start)
queue_rate = self.logutils.get_rate(self.queued)
end_rate = self.logutils.get_rate(self.end)
self.info[NJQ] = len(self.queued)
self.info[NJR] = len(self.start)
self.info[NJE] = len(self.end)
self.info[JRR] = run_rate
self.info[JSR] = queue_rate
self.info[JER] = end_rate
self.info[VER] = ",".join(self.version)
return self.info
class PBSAccountingLog(PBSLogAnalyzer):
"""
Container and Parser of a PBS accounting log
"""
tm_tag = re.compile(tm_re)
record_tag = re.compile(r"""
(?P\d\d/\d\d/\d{4,4})[\s]+
(?P