pbs_logutils.py 78 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187
  1. # coding: utf-8
  2. # Copyright (C) 1994-2018 Altair Engineering, Inc.
  3. # For more information, contact Altair at www.altair.com.
  4. #
  5. # This file is part of the PBS Professional ("PBS Pro") software.
  6. #
  7. # Open Source License Information:
  8. #
  9. # PBS Pro is free software. You can redistribute it and/or modify it under the
  10. # terms of the GNU Affero General Public License as published by the Free
  11. # Software Foundation, either version 3 of the License, or (at your option) any
  12. # later version.
  13. #
  14. # PBS Pro is distributed in the hope that it will be useful, but WITHOUT ANY
  15. # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
  16. # FOR A PARTICULAR PURPOSE.
  17. # See the GNU Affero General Public License for more details.
  18. #
  19. # You should have received a copy of the GNU Affero General Public License
  20. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  21. #
  22. # Commercial License Information:
  23. #
  24. # For a copy of the commercial license terms and conditions,
  25. # go to: (http://www.pbspro.com/UserArea/agreement.html)
  26. # or contact the Altair Legal Department.
  27. #
  28. # Altair’s dual-license business model allows companies, individuals, and
  29. # organizations to create proprietary derivative works of PBS Pro and
  30. # distribute them - whether embedded or bundled with other software -
  31. # under a commercial license agreement.
  32. #
  33. # Use of Altair’s trademarks, including but not limited to "PBS™",
  34. # "PBS Professional®", and "PBS Pro™" and Altair’s logos is subject to Altair's
  35. # trademark licensing policies.
  36. import re
  37. import time
  38. import sys
  39. import datetime
  40. import logging
  41. import traceback
  42. import math
  43. from subprocess import Popen, PIPE
  44. from ptl.utils.pbs_dshutils import DshUtils
  45. from ptl.lib.pbs_testlib import BatchUtils, Server, NODE, JOB, SET, EQ
  46. from ptl.lib.pbs_testlib import ResourceResv
  47. from ptl.utils.pbs_fileutils import FileUtils, FILE_TAIL
  48. """
  49. Analyze ``server``, ``scheduler``, ``MoM``, and ``accounting`` logs.
  50. - Scheduler log analysis:
  51. Extraction of per cycle information including:
  52. cycle start time
  53. cycle duration
  54. time to query objects from server
  55. number of jobs considered
  56. number of jobs run
  57. number of jobs that failed to run
  58. Number of jobs preempted
  59. Number of jobs that failed to preempt
  60. Number of jobs calendared
  61. time to determine that a job can run
  62. time to determine that a job can not run
  63. time spent calendaring
  64. time spent in scheduler solver
  65. Summary of all cycles information
  66. - Server log analysis:
  67. job submit rate
  68. number of jobs ended
  69. number of jobs run
  70. job run rate
  71. job submit rate
  72. job end rate
  73. job wait time distribution
  74. PBS versions
  75. node up rate
  76. wait time
  77. - Mom log analysis:
  78. job submit rate
  79. number of jobs ended
  80. number of jobs run
  81. job run rate
  82. job submit rate
  83. job end rate
  84. PBS versions
  85. - Accounting log analysis:
  86. job submit rate
  87. number of jobs ended
  88. number of jobs run
  89. job run rate
  90. job submit rate
  91. job end rate
  92. job size (cpu and node) distribution
  93. job wait time distribution
  94. utilization
  95. """
  96. tm_re = r'(?P<datetime>\d\d/\d\d/\d{4}\s\d\d:\d\d:\d\d(\.\d{6})?)'
  97. job_re = r';(?P<jobid>[\d\[\d*\]]+)\.'
  98. fail_re = r';(?P<jobid>[\d\[\[]+)\.'
  99. # Server metrics
  100. NUR = 'node_up_rate'
  101. # Scheduler metrics
  102. NC = 'num_cycles'
  103. mCD = 'cycle_duration_min'
  104. MCD = 'cycle_duration_max'
  105. mCT = 'min_cycle_time'
  106. MCT = 'max_cycle_time'
  107. CDA = 'cycle_duration_mean'
  108. CD25 = 'cycle_duration_25p'
  109. CDA = 'cycle_duration_mean'
  110. CD50 = 'cycle_duration_median'
  111. CD75 = 'cycle_duration_75p'
  112. CST = 'cycle_start_time'
  113. CD = 'cycle_duration'
  114. QD = 'query_duration'
  115. NJC = 'num_jobs_considered'
  116. NJFR = 'num_jobs_failed_to_run'
  117. SST = 'scheduler_solver_time'
  118. NJCAL = 'num_jobs_calendared'
  119. NJFP = 'num_jobs_failed_to_preempt'
  120. NJP = 'num_jobs_preempted'
  121. T2R = 'time_to_run'
  122. T2D = 'time_to_discard'
  123. TiS = 'time_in_sched'
  124. TTC = 'time_to_calendar'
  125. # Scheduling Estimated Start Time
  126. EST = 'estimates'
  127. EJ = 'estimated_jobs'
  128. Eat = 'estimated'
  129. DDm = 'drift_duration_min'
  130. DDM = 'drift_duration_max'
  131. DDA = 'drift_duration_mean'
  132. DD50 = 'drift_duration_median'
  133. ND = 'num_drifts'
  134. NJD = 'num_jobs_drifted'
  135. NJND = 'num_jobs_no_drift'
  136. NEST = 'num_estimates'
  137. JDD = 'job_drift_duration'
  138. ESTR = 'estimated_start_time_range'
  139. ESTA = 'estimated_start_time_accuracy'
  140. JST = 'job_start_time'
  141. ESTS = 'estimated_start_time_summary'
  142. Ds15mn = 'drifted_sub_15mn'
  143. Ds1hr = 'drifted_sub_1hr'
  144. Ds3hr = 'drifted_sub_3hr'
  145. Do3hr = 'drifted_over_3hr'
  146. # Accounting metrics
  147. JWTm = 'job_wait_time_min'
  148. JWTM = 'job_wait_time_max'
  149. JWTA = 'job_wait_time_mean'
  150. JWT25 = 'job_wait_time_25p'
  151. JWT50 = 'job_wait_time_median'
  152. JWT75 = 'job_wait_time_75p'
  153. JRTm = 'job_run_time_min'
  154. JRT25 = 'job_run_time_25p'
  155. JRT50 = 'job_run_time_median'
  156. JRTA = 'job_run_time_mean'
  157. JRT75 = 'job_run_time_75p'
  158. JRTM = 'job_run_time_max'
  159. JNSm = 'job_node_size_min'
  160. JNS25 = 'job_node_size_25p'
  161. JNS50 = 'job_node_size_median'
  162. JNSA = 'job_node_size_mean'
  163. JNS75 = 'job_node_size_75p'
  164. JNSM = 'job_node_size_max'
  165. JCSm = 'job_cpu_size_min'
  166. JCS25 = 'job_cpu_size_25p'
  167. JCS50 = 'job_cpu_size_median'
  168. JCSA = 'job_cpu_size_mean'
  169. JCS75 = 'job_cpu_size_75p'
  170. JCSM = 'job_cpu_size_max'
  171. CPH = 'cpu_hours'
  172. NPH = 'node_hours'
  173. USRS = 'unique_users'
  174. UNCPUS = 'utilization_ncpus'
  175. UNODES = 'utilization_nodes'
  176. # Generic metrics
  177. VER = 'pbs_version'
  178. JID = 'job_id'
  179. JRR = 'job_run_rate'
  180. JSR = 'job_submit_rate'
  181. JER = 'job_end_rate'
  182. NJQ = 'num_jobs_queued'
  183. NJR = 'num_jobs_run'
  184. NJE = 'num_jobs_ended'
  185. DUR = 'duration'
  186. RI = 'custom_interval'
  187. IT = 'init_time'
  188. CF = 'custom_freq'
  189. CFC = 'custom_freq_counts'
  190. CG = 'custom_groups'
  191. PARSER_OK_CONTINUE = 0
  192. PARSER_OK_STOP = 1
  193. PARSER_ERROR_CONTINUE = 2
  194. PARSER_ERROR_STOP = 3
  195. epoch_datetime = datetime.datetime.fromtimestamp(0)
  196. class PBSLogUtils(object):
  197. """
  198. Miscellaneous utilities to process log files
  199. """
  200. logger = logging.getLogger(__name__)
  201. du = DshUtils()
  202. @classmethod
  203. def convert_date_time(cls, dt=None, fmt=None):
  204. """
  205. convert a date time string of the form given by fmt into
  206. number of seconds since epoch (with possible microseconds)
  207. :param dt: the datetime string to convert
  208. :type dt: str or None
  209. :param fmt: Format to which datetime is to be converted
  210. :type fmt: str
  211. :returns: None if conversion fails
  212. """
  213. if dt is None:
  214. return None
  215. micro = False
  216. if fmt is None:
  217. if '.' in dt:
  218. micro = True
  219. fmt = "%m/%d/%Y %H:%M:%S.%f"
  220. else:
  221. fmt = "%m/%d/%Y %H:%M:%S"
  222. try:
  223. # Get datetime object
  224. t = datetime.datetime.strptime(dt, fmt)
  225. # Get timedelta object of epoch time
  226. t -= epoch_datetime
  227. # get epoch time from timedelta object
  228. tm = t.total_seconds()
  229. except:
  230. cls.logger.debug("could not convert date time: " + str(datetime))
  231. return None
  232. if micro is True:
  233. return tm
  234. else:
  235. return int(tm)
  236. def get_num_lines(self, log, hostname=None, sudo=False):
  237. """
  238. Get the number of lines of particular log
  239. :param log: the log file name
  240. :type log: str
  241. """
  242. f = self.open_log(log, hostname, sudo=sudo)
  243. nl = sum([1 for _ in f])
  244. f.close()
  245. return nl
  246. def open_log(self, log, hostname=None, sudo=False):
  247. """
  248. :param log: the log file name to read from
  249. :type log: str
  250. :param hostname: the hostname from which to read the file
  251. :type hostname: str or None
  252. :param sudo: Whether to access log file as a privileged user.
  253. :type sudo: boolean
  254. :returns: A file instance
  255. """
  256. try:
  257. if hostname is None or self.du.is_localhost(hostname):
  258. if sudo:
  259. cmd = ['sudo', 'cat', log]
  260. self.logger.info('running ' + " ".join(cmd))
  261. p = Popen(cmd, stdout=PIPE)
  262. f = p.stdout
  263. else:
  264. f = open(log)
  265. else:
  266. cmd = ['ssh', hostname]
  267. if sudo:
  268. cmd += ['sudo']
  269. cmd += ['cat', log]
  270. self.logger.debug('running ' + " ".join(cmd))
  271. p = Popen(cmd, stdout=PIPE)
  272. f = p.stdout
  273. except:
  274. traceback.print_exc()
  275. self.logger.error('Problem processing file ' + log)
  276. f = None
  277. return f
  278. def get_timestamps(self, logfile=None, hostname=None, num=None,
  279. sudo=False):
  280. """
  281. Helper function to parse logfile
  282. :returns: Each timestamp in a list as number of seconds since epoch
  283. """
  284. if logfile is None:
  285. return
  286. records = self.open_log(logfile, hostname, sudo=sudo)
  287. if records is None:
  288. return
  289. rec_times = []
  290. tm_tag = re.compile(tm_re)
  291. num_rec = 0
  292. for record in records:
  293. num_rec += 1
  294. if num is not None and num_rec > num:
  295. break
  296. m = tm_tag.match(record)
  297. if m:
  298. rec_times.append(
  299. self.convert_date_time(m.group('datetime')))
  300. records.close()
  301. return rec_times
  302. def match_msg(self, lines, msg, allmatch=False, regexp=False,
  303. starttime=None, endtime=None):
  304. """
  305. Returns (x,y) where x is the matching line y, or None if
  306. nothing is found.
  307. :param allmatch: If True (False by default), return a list
  308. of matching tuples.
  309. :type allmatch: boolean
  310. :param regexp: If True, msg is a Python regular expression.
  311. Defaults to False.
  312. :type regexp: bool
  313. :param starttime: If set ignore matches that occur before
  314. specified time
  315. :param endtime: If set ignore matches that occur after
  316. specified time
  317. """
  318. linecount = 0
  319. ret = []
  320. if lines:
  321. for l in lines:
  322. if starttime is not None:
  323. # l[:19] captures the log record time
  324. tm = self.convert_date_time(l[:19])
  325. if tm is None or tm < starttime:
  326. continue
  327. if endtime is not None:
  328. # l[:19] captures the log record time
  329. tm = self.convert_date_time(l[:19])
  330. if tm is None or tm > endtime:
  331. continue
  332. if ((regexp and re.search(msg, l)) or
  333. (not regexp and l.find(msg) != -1)):
  334. m = (linecount, l)
  335. if allmatch:
  336. ret.append(m)
  337. else:
  338. return m
  339. linecount += 1
  340. if len(ret) > 0:
  341. return ret
  342. return None
  343. @staticmethod
  344. def convert_resv_date_time(date_time):
  345. """
  346. Convert reservation datetime to seconds
  347. """
  348. try:
  349. t = time.strptime(date_time, "%a %b %d %H:%M:%S %Y")
  350. except:
  351. t = time.localtime()
  352. return int(time.mktime(t))
  353. @staticmethod
  354. def convert_hhmmss_time(tm):
  355. """
  356. Convert datetime in hhmmss format to seconds
  357. """
  358. if ':' not in tm:
  359. return tm
  360. hms = tm.split(':')
  361. return int(int(hms[0]) * 3600 + int(hms[1]) * 60 + int(hms[2]))
  362. def get_rate(self, l=[]):
  363. """
  364. :returns: The frequency of occurrences of array l
  365. The array is expected to be sorted
  366. """
  367. if len(l) > 0:
  368. duration = l[len(l) - 1] - l[0]
  369. if duration > 0:
  370. tm_factor = [1, 60, 60, 24]
  371. _rate = float(len(l)) / float(duration)
  372. index = 0
  373. while _rate < 1 and index < len(tm_factor):
  374. index += 1
  375. _rate *= tm_factor[index]
  376. _rate = "%.2f" % (_rate)
  377. if index == 0:
  378. _rate = str(_rate) + '/s'
  379. elif index == 1:
  380. _rate = str(_rate) + '/mn'
  381. elif index == 2:
  382. _rate = str(_rate) + '/hr'
  383. else:
  384. _rate = str(_rate) + '/day'
  385. else:
  386. _rate = str(len(l)) + '/s'
  387. return _rate
  388. return 0
  389. def in_range(self, tm, start=None, end=None):
  390. """
  391. :param tm: time to check within a provided range
  392. :param start: Lower limit for the time range
  393. :param end: Higer limit for the time range
  394. :returns: True if time is in the range else return False
  395. """
  396. if start is None and end is None:
  397. return True
  398. if start is None and end is not None:
  399. if tm <= end:
  400. return True
  401. else:
  402. return False
  403. if start is not None and end is None:
  404. if tm >= start:
  405. return True
  406. else:
  407. return False
  408. else:
  409. if tm >= start and tm <= end:
  410. return True
  411. return False
  412. @staticmethod
  413. def _duration(val=None):
  414. if val is not None:
  415. return str(datetime.timedelta(seconds=int(val)))
  416. @staticmethod
  417. def get_day(tm=None):
  418. """
  419. :param tm: Time for which to get a day
  420. """
  421. if tm is None:
  422. tm = time.time()
  423. return time.strftime("%Y%m%d", time.localtime(tm))
  424. @staticmethod
  425. def percentile(N, percent):
  426. """
  427. Find the percentile of a list of values.
  428. :param N: A list of values. Note N MUST BE already sorted.
  429. :type N: List
  430. :param percent: A float value from 0.0 to 1.0.
  431. :type percent: Float
  432. :returns: The percentile of the values
  433. """
  434. if not N:
  435. return None
  436. k = (len(N) - 1) * percent
  437. f = math.floor(k)
  438. c = math.ceil(k)
  439. if f == c:
  440. return N[int(k)]
  441. d0 = N[int(f)] * (c - k)
  442. d1 = N[int(c)] * (k - f)
  443. return d0 + d1
  444. @staticmethod
  445. def process_intervals(intervals, groups, frequency=60):
  446. """
  447. Process the intervals
  448. """
  449. info = {}
  450. if not intervals:
  451. return info
  452. val = [x - intervals[i - 1] for i, x in enumerate(intervals) if i > 0]
  453. info[RI] = ", ".join(map(lambda v: str(v), val))
  454. if intervals:
  455. info[IT] = intervals[0]
  456. if frequency is not None:
  457. _cf = []
  458. j = 0
  459. i = 1
  460. while i < len(intervals):
  461. if (intervals[i] - intervals[j]) > frequency:
  462. _cf.append(((intervals[j], intervals[i - 1]), i - j))
  463. j = i
  464. i += 1
  465. if i != j + 1:
  466. _cf.append(((intervals[j], intervals[i - 1]), i - j))
  467. else:
  468. _cf.append(((intervals[j], intervals[j]), 1))
  469. info[CFC] = _cf
  470. info[CF] = frequency
  471. if groups:
  472. info[CG] = groups
  473. return info
  474. def get_log_files(self, hostname, path, start, end, sudo=False):
  475. """
  476. :param hostname: Hostname of the machine
  477. :type hostname: str
  478. :param path: Path for the log file
  479. :type path: str
  480. :param start: Start time for the log file
  481. :param end: End time for the log file
  482. :returns: list of log file(s) found or an empty list
  483. """
  484. paths = []
  485. if self.du.isdir(hostname, path, sudo=sudo):
  486. logs = self.du.listdir(hostname, path, sudo=sudo)
  487. for f in sorted(logs):
  488. if start is not None or end is not None:
  489. tm = self.get_timestamps(f, hostname, num=1, sudo=sudo)
  490. if not tm:
  491. continue
  492. d1 = time.strftime("%Y%m%d", time.localtime(tm[0]))
  493. if start is not None:
  494. d2 = time.strftime("%Y%m%d", time.localtime(start))
  495. if d1 < d2:
  496. continue
  497. if end is not None:
  498. d2 = time.strftime("%Y%m%d", time.localtime(end))
  499. if d1 > d2:
  500. continue
  501. paths.append(f)
  502. elif self.du.isfile(hostname, path, sudo=sudo):
  503. paths = [path]
  504. return paths
  505. class PBSLogAnalyzer(object):
  506. """
  507. Utility to analyze the PBS logs
  508. """
  509. logger = logging.getLogger(__name__)
  510. logutils = PBSLogUtils()
  511. generic_tag = re.compile(tm_re + ".*")
  512. node_type_tag = re.compile(tm_re + ".*" + "Type 58 request.*")
  513. queue_type_tag = re.compile(tm_re + ".*" + "Type 20 request.*")
  514. job_type_tag = re.compile(tm_re + ".*" + "Type 51 request.*")
  515. job_exit_tag = re.compile(tm_re + ".*" + job_re + ";Exit_status.*")
  516. def __init__(self, schedlog=None, serverlog=None,
  517. momlog=None, acctlog=None, genericlog=None,
  518. hostname=None, show_progress=False):
  519. self.hostname = hostname
  520. self.schedlog = schedlog
  521. self.serverlog = serverlog
  522. self.acctlog = acctlog
  523. self.momlog = momlog
  524. self.genericlog = genericlog
  525. self.show_progress = show_progress
  526. self._custom_tag = None
  527. self._custom_freq = None
  528. self._custom_id = False
  529. self._re_interval = []
  530. self._re_group = {}
  531. self.num_conditional_matches = 0
  532. self.re_conditional = None
  533. self.num_conditionals = 0
  534. self.prev_records = []
  535. self.info = {}
  536. self.scheduler = None
  537. self.server = None
  538. self.mom = None
  539. self.accounting = None
  540. if schedlog:
  541. self.scheduler = PBSSchedulerLog(schedlog, hostname, show_progress)
  542. if serverlog:
  543. self.server = PBSServerLog(serverlog, hostname, show_progress)
  544. if momlog:
  545. self.mom = PBSMoMLog(momlog, hostname, show_progress)
  546. if acctlog:
  547. self.accounting = PBSAccountingLog(acctlog, hostname,
  548. show_progress)
  549. def set_custom_match(self, pattern, frequency=None):
  550. """
  551. Set the custome matching
  552. :param pattern: Matching pattern
  553. :type pattern: str
  554. :param frequency: Frequency of match
  555. :type frequency: int
  556. """
  557. self._custom_tag = re.compile(tm_re + ".*" + pattern + ".*")
  558. self._custom_freq = frequency
  559. def set_conditional_match(self, conditions):
  560. """
  561. Set the conditional match
  562. :param conditions: Conditions for macthing
  563. """
  564. if not isinstance(conditions, list):
  565. return False
  566. self.re_conditional = conditions
  567. self.num_conditionals = len(conditions)
  568. self.prev_records = map(lambda n: '', range(self.num_conditionals))
  569. self.info['matches'] = []
  570. def analyze_scheduler_log(self, filename=None, start=None, end=None,
  571. hostname=None, summarize=True):
  572. """
  573. Analyze the scheduler log
  574. :param filename: Scheduler log file name
  575. :type filename: str or None
  576. :param start: Time from which log to be analyzed
  577. :param end: Time till which log to be analyzed
  578. :param hostname: Hostname of the machine
  579. :type hostname: str or None
  580. :param summarize: Summarize data parsed if True else not
  581. :type summarize: bool
  582. """
  583. if self.scheduler is None:
  584. self.scheduler = PBSSchedulerLog(filename, hostname=hostname)
  585. return self.scheduler.analyze(filename, start, end, hostname,
  586. summarize)
  587. def analyze_server_log(self, filename=None, start=None, end=None,
  588. hostname=None, summarize=True):
  589. """
  590. Analyze the server log
  591. """
  592. if self.server is None:
  593. self.server = PBSServerLog(filename, hostname=hostname)
  594. return self.server.analyze(filename, start, end, hostname,
  595. summarize)
  596. def analyze_accounting_log(self, filename=None, start=None, end=None,
  597. hostname=None, summarize=True):
  598. """
  599. Analyze the accounting log
  600. """
  601. if self.accounting is None:
  602. self.accounting = PBSAccountingLog(filename, hostname=hostname)
  603. return self.accounting.analyze(filename, start, end, hostname,
  604. summarize=summarize, sudo=True)
  605. def analyze_mom_log(self, filename=None, start=None, end=None,
  606. hostname=None, summarize=True):
  607. """
  608. Analyze the mom log
  609. """
  610. if self.mom is None:
  611. self.mom = PBSMoMLog(filename, hostname=hostname)
  612. return self.mom.analyze(filename, start, end, hostname, summarize)
  613. def parse_conditional(self, rec, start, end):
  614. """
  615. Match a sequence of regular expressions against multiple
  616. consecutive lines in a generic log. Calculate the number
  617. of conditional matching lines.
  618. Example usage: to find the number of times the scheduler
  619. stat'ing the server causes the scheduler to miss jobs ending,
  620. which could possibly indicate a race condition between the
  621. view of resources assigned to nodes and the actual jobs
  622. running, one would call this function by setting
  623. re_conditional to
  624. ``['Type 20 request received from Scheduler', 'Exit_status']``
  625. Which can be read as counting the number of times that the
  626. Type 20 message is preceded by an ``Exit_status`` message
  627. """
  628. match = True
  629. for rc in range(self.num_conditionals):
  630. if not re.search(self.re_conditional[rc], self.prev_records[rc]):
  631. match = False
  632. if match:
  633. self.num_conditional_matches += 1
  634. self.info['matches'].extend(self.prev_records)
  635. for i in range(self.num_conditionals - 1, -1, -1):
  636. self.prev_records[i] = self.prev_records[i - 1]
  637. self.prev_records[0] = rec
  638. return PARSER_OK_CONTINUE
  639. def parse_custom_tag(self, rec, start, end):
  640. m = self._custom_tag.match(rec)
  641. if m:
  642. tm = self.logutils.convert_date_time(m.group('datetime'))
  643. if ((start is None and end is None) or
  644. self.logutils.in_range(tm, start, end)):
  645. self._re_interval.append(tm)
  646. for k, v in m.groupdict().items():
  647. if k in self._re_group:
  648. self._re_group[k].append(v)
  649. else:
  650. self._re_group[k] = [v]
  651. elif end is not None and tm > end:
  652. return PARSER_OK_STOP
  653. return PARSER_OK_CONTINUE
  654. def parse_block(self, rec, start, end):
  655. m = self.generic_tag.match(rec)
  656. if m:
  657. tm = self.logutils.convert_date_time(m.group('datetime'))
  658. if ((start is None and end is None) or
  659. self.logutils.in_range(tm, start, end)):
  660. print rec,
  661. def comp_analyze(self, rec, start, end):
  662. if self.re_conditional is not None:
  663. return self.parse_conditional(rec, start, end)
  664. elif self._custom_tag is not None:
  665. return self.parse_custom_tag(rec, start, end)
  666. elif start is not None or end is not None:
  667. return self.parse_block(rec, start, end)
  668. def analyze(self, path=None, start=None, end=None, hostname=None,
  669. summarize=True, sudo=False):
  670. """
  671. Parse any log file. This method is not ``context-specific``
  672. to each log file type.
  673. :param path: name of ``file/dir`` to parse
  674. :type path: str or None
  675. :param start: optional record time at which to start analyzing
  676. :param end: optional record time after which to stop analyzing
  677. :param hostname: name of host on which to operate. Defaults to
  678. localhost
  679. :type hostname: str or None
  680. :param summarize: if True, summarize data parsed. Defaults to
  681. True.
  682. :type summarize: bool
  683. :param sudo: If True, access log file(s) as privileged user.
  684. :type sudo: bool
  685. """
  686. if hostname is None and self.hostname is not None:
  687. hostname = self.hostname
  688. for f in self.logutils.get_log_files(hostname, path, start, end,
  689. sudo=sudo):
  690. self._log_parser(f, start, end, hostname, sudo=sudo)
  691. if summarize:
  692. return self.summary()
  693. def _log_parser(self, filename, start, end, hostname=None, sudo=False):
  694. if filename is not None:
  695. records = self.logutils.open_log(filename, hostname, sudo=sudo)
  696. else:
  697. return None
  698. if records is None:
  699. return None
  700. num_records = self.logutils.get_num_lines(filename, hostname,
  701. sudo=sudo)
  702. num_line = 0
  703. last_rec = None
  704. if self.show_progress:
  705. perc_range = range(10, 110, 10)
  706. perc_records = map(lambda x: num_records * x / 100, perc_range)
  707. sys.stderr.write('Parsing ' + filename + ': |0%')
  708. sys.stderr.flush()
  709. for rec in records:
  710. num_line += 1
  711. if self.show_progress and (num_line > perc_records[0]):
  712. sys.stderr.write('-' + str(perc_range[0]) + '%')
  713. sys.stderr.flush()
  714. perc_range.remove(perc_range[0])
  715. perc_records.remove(perc_records[0])
  716. last_rec = rec
  717. rv = self.comp_analyze(rec, start, end)
  718. if (rv in (PARSER_OK_STOP, PARSER_ERROR_STOP) or
  719. (self.show_progress and len(perc_records) == 0)):
  720. break
  721. if self.show_progress:
  722. sys.stderr.write('-100%|\n')
  723. sys.stderr.flush()
  724. records.close()
  725. if last_rec is not None:
  726. self.epilogue(last_rec)
  727. def analyze_logs(self, schedlog=None, serverlog=None, momlog=None,
  728. acctlog=None, genericlog=None, start=None, end=None,
  729. hostname=None, showjob=False):
  730. """
  731. Analyze logs
  732. """
  733. if hostname is None and self.hostname is not None:
  734. hostname = self.hostname
  735. if schedlog is None and self.schedlog is not None:
  736. schedlog = self.schedlog
  737. if serverlog is None and self.serverlog is not None:
  738. serverlog = self.serverlog
  739. if momlog is None and self.momlog is not None:
  740. momlog = self.momlog
  741. if acctlog is None and self.acctlog is not None:
  742. acctlog = self.acctlog
  743. if genericlog is None and self.genericlog is not None:
  744. genericlog = self.genericlog
  745. cycles = None
  746. sjr = {}
  747. if schedlog:
  748. self.analyze_scheduler_log(schedlog, start, end, hostname,
  749. summarize=False)
  750. cycles = self.scheduler.cycles
  751. if serverlog:
  752. self.analyze_server_log(serverlog, start, end, hostname,
  753. summarize=False)
  754. sjr = self.server.server_job_run
  755. if momlog:
  756. self.analyze_mom_log(momlog, start, end, hostname,
  757. summarize=False)
  758. if acctlog:
  759. self.analyze_accounting_log(acctlog, start, end, hostname,
  760. summarize=False)
  761. if genericlog:
  762. self.analyze(genericlog, start, end, hostname, sudo=True,
  763. summarize=False)
  764. if cycles is not None and len(sjr.keys()) != 0:
  765. for cycle in cycles:
  766. for jid, tm in cycle.sched_job_run.items():
  767. # skip job arrays: scheduler runs a subjob
  768. # but we don't keep track of which Considering job to run
  769. # message it is associated with because the consider
  770. # message doesn't show the subjob
  771. if '[' in jid:
  772. continue
  773. if jid in sjr:
  774. for tm in sjr[jid]:
  775. if tm > cycle.start and tm < cycle.end:
  776. cycle.inschedduration[jid] = \
  777. tm - cycle.consider[jid]
  778. return self.summary(showjob)
  779. def epilogue(self, line):
  780. pass
  781. def summary(self, showjob=False, writer=None):
  782. info = {}
  783. if self._custom_tag is not None:
  784. self.info = self.logutils.process_intervals(self._re_interval,
  785. self._re_group,
  786. self._custom_freq)
  787. return self.info
  788. if self.re_conditional is not None:
  789. self.info['num_conditional_matches'] = self.num_conditional_matches
  790. return self.info
  791. if self.scheduler is not None:
  792. info['scheduler'] = self.scheduler.summary(self.scheduler.cycles,
  793. showjob)
  794. if self.server is not None:
  795. info['server'] = self.server.summary()
  796. if self.accounting is not None:
  797. info['accounting'] = self.accounting.summary()
  798. if self.mom is not None:
  799. info['mom'] = self.mom.summary()
  800. return info
  801. class PBSServerLog(PBSLogAnalyzer):
  802. """
  803. :param filename: Server log filename
  804. :type filename: str or None
  805. :param hostname: Hostname of the machine
  806. :type hostname: str or None
  807. """
  808. tm_tag = re.compile(tm_re)
  809. server_run_tag = re.compile(tm_re + ".*" + job_re + ".*;Job Run at.*")
  810. server_nodeup_tag = re.compile(tm_re + ".*Node;.*;node up.*")
  811. server_enquejob_tag = re.compile(tm_re + ".*" + job_re +
  812. ".*enqueuing into.*state 1 .*")
  813. server_endjob_tag = re.compile(tm_re + ".*" + job_re +
  814. ".*;Exit_status.*")
  815. def __init__(self, filename=None, hostname=None, show_progress=False):
  816. self.server_job_queued = {}
  817. self.server_job_run = {}
  818. self.server_job_end = {}
  819. self.records = None
  820. self.nodeup = []
  821. self.enquejob = []
  822. self.record_tm = []
  823. self.jobsrun = []
  824. self.jobsend = []
  825. self.wait_time = []
  826. self.run_time = []
  827. self.hostname = hostname
  828. self.info = {}
  829. self.version = []
  830. self.filename = filename
  831. self.show_progress = show_progress
  832. def parse_runjob(self, line):
  833. """
  834. Parse server log for run job records.
  835. For each record keep track of the job id, and time in a
  836. dedicated array
  837. """
  838. m = self.server_run_tag.match(line)
  839. if m:
  840. tm = self.logutils.convert_date_time(m.group('datetime'))
  841. self.jobsrun.append(tm)
  842. jobid = str(m.group('jobid'))
  843. if jobid in self.server_job_run:
  844. self.server_job_run[jobid].append(tm)
  845. else:
  846. self.server_job_run[jobid] = [tm]
  847. if jobid in self.server_job_queued:
  848. self.wait_time.append(tm - self.server_job_queued[jobid])
  849. def parse_endjob(self, line):
  850. """
  851. Parse server log for run job records.
  852. For each record keep track of the job id, and time in a
  853. dedicated array
  854. """
  855. m = self.server_endjob_tag.match(line)
  856. if m:
  857. tm = self.logutils.convert_date_time(m.group('datetime'))
  858. self.jobsend.append(tm)
  859. jobid = str(m.group('jobid'))
  860. if jobid in self.server_job_end:
  861. self.server_job_end[jobid].append(tm)
  862. else:
  863. self.server_job_end[jobid] = [tm]
  864. if jobid in self.server_job_run:
  865. self.run_time.append(tm - self.server_job_run[jobid][-1:][0])
  866. def parse_nodeup(self, line):
  867. """
  868. Parse server log for nodes that are up
  869. """
  870. m = self.server_nodeup_tag.match(line)
  871. if m:
  872. tm = self.logutils.convert_date_time(m.group('datetime'))
  873. self.nodeup.append(tm)
  874. def parse_enquejob(self, line):
  875. """
  876. Parse server log for enqued jobs
  877. """
  878. m = self.server_enquejob_tag.match(line)
  879. if m:
  880. tm = self.logutils.convert_date_time(m.group('datetime'))
  881. self.enquejob.append(tm)
  882. jobid = str(m.group('jobid'))
  883. self.server_job_queued[jobid] = tm
  884. def comp_analyze(self, rec, start=None, end=None):
  885. m = self.tm_tag.match(rec)
  886. if m:
  887. tm = self.logutils.convert_date_time(m.group('datetime'))
  888. self.record_tm.append(tm)
  889. if not self.logutils.in_range(tm, start, end):
  890. if end and tm > end:
  891. return PARSER_OK_STOP
  892. return PARSER_OK_CONTINUE
  893. if 'pbs_version=' in rec:
  894. version = rec.split('pbs_version=')[1].strip()
  895. if version not in self.version:
  896. self.version.append(version)
  897. self.parse_enquejob(rec)
  898. self.parse_nodeup(rec)
  899. self.parse_runjob(rec)
  900. self.parse_endjob(rec)
  901. return PARSER_OK_CONTINUE
  902. def summary(self):
  903. self.info[JSR] = self.logutils.get_rate(self.enquejob)
  904. self.info[NJE] = len(self.server_job_end.keys())
  905. self.info[NJQ] = len(self.enquejob)
  906. self.info[NUR] = self.logutils.get_rate(self.nodeup)
  907. self.info[JRR] = self.logutils.get_rate(self.jobsrun)
  908. self.info[JER] = self.logutils.get_rate(self.jobsend)
  909. if len(self.wait_time) > 0:
  910. wt = sorted(self.wait_time)
  911. wta = float(sum(self.wait_time)) / len(self.wait_time)
  912. self.info[JWTm] = self.logutils._duration(min(wt))
  913. self.info[JWTM] = self.logutils._duration(max(wt))
  914. self.info[JWTA] = self.logutils._duration(wta)
  915. self.info[JWT25] = self.logutils._duration(
  916. self.logutils.percentile(wt, .25))
  917. self.info[JWT50] = self.logutils._duration(
  918. self.logutils.percentile(wt, .5))
  919. self.info[JWT75] = self.logutils._duration(
  920. self.logutils.percentile(wt, .75))
  921. njr = 0
  922. for v in self.server_job_run.values():
  923. njr += len(v)
  924. self.info[NJR] = njr
  925. self.info[VER] = ",".join(self.version)
  926. if len(self.run_time) > 0:
  927. rt = sorted(self.run_time)
  928. self.info[JRTm] = self.logutils._duration(min(rt))
  929. self.info[JRT25] = self.logutils._duration(
  930. self.logutils.percentile(rt, 0.25))
  931. self.info[JRT50] = self.logutils._duration(
  932. self.logutils.percentile(rt, 0.50))
  933. self.info[JRTA] = self.logutils._duration(
  934. str(sum(rt) / len(rt)))
  935. self.info[JRT75] = self.logutils._duration(
  936. self.logutils.percentile(rt, 0.75))
  937. self.info[JRTM] = self.logutils._duration(max(rt))
  938. return self.info
  939. class JobEstimatedStartTimeInfo(object):
  940. """
  941. Information regarding Job estimated start time
  942. """
  943. def __init__(self, jobid):
  944. self.jobid = jobid
  945. self.started_at = None
  946. self.estimated_at = []
  947. self.num_drifts = 0
  948. self.num_estimates = 0
  949. self.drift_time = 0
  950. def add_estimate(self, tm):
  951. """
  952. Add a job's new estimated start time
  953. If the new estimate is now later than any preivous one, we
  954. add that difference to the drift time. If the new drift time
  955. is pulled earlier it is not added to the drift time.
  956. drift time is a measure of ``"negative perception"`` that
  957. comes along a job being estimated to run at a later date than
  958. earlier ``"advertised"``.
  959. """
  960. if self.estimated_at:
  961. prev_tm = self.estimated_at[len(self.estimated_at) - 1]
  962. if tm > prev_tm:
  963. self.num_drifts += 1
  964. self.drift_time += tm - prev_tm
  965. self.estimated_at.append(tm)
  966. self.num_estimates += 1
  967. def __repr__(self):
  968. estimated_at_str = map(lambda t: str(t), self.estimated_at)
  969. return " ".join([str(self.jobid), 'started: ', str(self.started_at),
  970. 'estimated: ', ",".join(estimated_at_str)])
  971. def __str__(self):
  972. return self.__repr__()
  973. class PBSSchedulerLog(PBSLogAnalyzer):
  974. tm_tag = re.compile(tm_re)
  975. startcycle_tag = re.compile(tm_re + ".*Starting Scheduling.*")
  976. endcycle_tag = re.compile(tm_re + ".*Leaving [(the )]*[sS]cheduling.*")
  977. alarm_tag = re.compile(tm_re + ".*alarm.*")
  978. considering_job_tag = re.compile(tm_re + ".*" + job_re +
  979. ".*;Considering job to run.*")
  980. sched_job_run_tag = re.compile(tm_re + ".*" + job_re + ".*;Job run.*")
  981. estimated_tag = re.compile(tm_re + ".*" + job_re +
  982. ".*;Job is a top job and will run at "
  983. "(?P<est_tm>.*)")
  984. run_failure_tag = re.compile(tm_re + ".*" + fail_re + ".*;Failed to run.*")
  985. calendarjob_tag = re.compile(
  986. tm_re +
  987. ".*" +
  988. job_re +
  989. ".*;Job is a top job.*")
  990. preempt_failure_tag = re.compile(tm_re + ".*;Job failed to be preempted.*")
  991. preempt_tag = re.compile(tm_re + ".*" + job_re + ".*;Job preempted.*")
  992. record_tag = re.compile(tm_re + ".*")
  993. def __init__(self, filename=None, hostname=None, show_progress=False):
  994. self.filename = filename
  995. self.hostname = hostname
  996. self.show_progress = show_progress
  997. self.record_tm = []
  998. self.version = []
  999. self.cycle = None
  1000. self.cycles = []
  1001. self.estimated_jobs = {}
  1002. self.estimated_parsing_enabled = False
  1003. self.parse_estimated_only = False
  1004. self.info = {}
  1005. self.summary_info = {}
  1006. def _parse_line(self, line):
  1007. """
  1008. Parse scheduling cycle Starting, Leaving, and alarm records
  1009. From each record, keep track of the record time in a
  1010. dedicated array
  1011. """
  1012. m = self.startcycle_tag.match(line)
  1013. if m:
  1014. tm = self.logutils.convert_date_time(m.group('datetime'))
  1015. # if cycle was interrupted assume previous cycle ended now
  1016. if self.cycle is not None and self.cycle.end == -1:
  1017. self.cycle.end = tm
  1018. self.cycle = PBSCycleInfo()
  1019. self.cycles.append(self.cycle)
  1020. self.cycle.start = tm
  1021. self.cycle.end = -1
  1022. return PARSER_OK_CONTINUE
  1023. m = self.endcycle_tag.match(line)
  1024. if m is not None and self.cycle is not None:
  1025. tm = self.logutils.convert_date_time(m.group('datetime'))
  1026. self.cycle.end = tm
  1027. self.cycle.duration = tm - self.cycle.start
  1028. if (self.cycle.lastjob is not None and
  1029. self.cycle.lastjob not in self.cycle.sched_job_run and
  1030. self.cycle.lastjob not in self.cycle.calendared_jobs):
  1031. self.cycle.cantrunduration[self.cycle.lastjob] = (
  1032. tm - self.cycle.consider[self.cycle.lastjob])
  1033. return PARSER_OK_CONTINUE
  1034. m = self.alarm_tag.match(line)
  1035. if m is not None and self.cycle is not None:
  1036. tm = self.logutils.convert_date_time(m.group('datetime'))
  1037. self.cycle.end = tm
  1038. return PARSER_OK_CONTINUE
  1039. m = self.considering_job_tag.match(line)
  1040. if m is not None and self.cycle is not None:
  1041. self.cycle.num_considered += 1
  1042. jid = str(m.group('jobid'))
  1043. tm = self.logutils.convert_date_time(m.group('datetime'))
  1044. self.cycle.consider[jid] = tm
  1045. self.cycle.political_order.append(jid)
  1046. if (self.cycle.lastjob is not None and
  1047. self.cycle.lastjob not in self.cycle.sched_job_run and
  1048. self.cycle.lastjob not in self.cycle.calendared_jobs):
  1049. self.cycle.cantrunduration[self.cycle.lastjob] = (
  1050. tm - self.cycle.consider[self.cycle.lastjob])
  1051. self.cycle.lastjob = jid
  1052. if self.cycle.queryduration == 0:
  1053. self.cycle.queryduration = tm - self.cycle.start
  1054. return PARSER_OK_CONTINUE
  1055. m = self.sched_job_run_tag.match(line)
  1056. if m is not None and self.cycle is not None:
  1057. jid = str(m.group('jobid'))
  1058. tm = self.logutils.convert_date_time(m.group('datetime'))
  1059. self.cycle.sched_job_run[jid] = tm
  1060. # job arrays require special handling because the considering
  1061. # job to run message does not have the subjob index but only []
  1062. if '[' in jid:
  1063. subjid = jid
  1064. if subjid not in self.cycle.consider:
  1065. jid = jid.split('[')[0] + '[]'
  1066. self.cycle.consider[subjid] = self.cycle.consider[jid]
  1067. self.cycle.runduration[subjid] = tm - self.cycle.consider[jid]
  1068. # job rerun due to preemption failure aren't considered, skip
  1069. elif jid in self.cycle.consider:
  1070. self.cycle.runduration[jid] = tm - self.cycle.consider[jid]
  1071. return PARSER_OK_CONTINUE
  1072. m = self.run_failure_tag.match(line)
  1073. if m is not None:
  1074. if self.cycle is not None:
  1075. jid = str(m.group('jobid'))
  1076. tm = self.logutils.convert_date_time(m.group('datetime'))
  1077. self.cycle.run_failure[jid] = tm
  1078. return PARSER_OK_CONTINUE
  1079. m = self.preempt_failure_tag.match(line)
  1080. if m is not None:
  1081. if self.cycle is not None:
  1082. self.cycle.num_preempt_failure += 1
  1083. return PARSER_OK_CONTINUE
  1084. m = self.preempt_tag.match(line)
  1085. if m is not None:
  1086. if self.cycle is not None:
  1087. jid = str(m.group('jobid'))
  1088. if self.cycle.lastjob in self.cycle.preempted_jobs:
  1089. self.cycle.preempted_jobs[self.cycle.lastjob].append(jid)
  1090. else:
  1091. self.cycle.preempted_jobs[self.cycle.lastjob] = [jid]
  1092. self.cycle.num_preempted += 1
  1093. return PARSER_OK_CONTINUE
  1094. m = self.calendarjob_tag.match(line)
  1095. if m is not None:
  1096. if self.cycle is not None:
  1097. jid = str(m.group('jobid'))
  1098. tm = self.logutils.convert_date_time(m.group('datetime'))
  1099. self.cycle.calendared_jobs[jid] = tm
  1100. if jid in self.cycle.consider:
  1101. self.cycle.calendarduration[jid] = \
  1102. (tm - self.cycle.consider[jid])
  1103. elif '[' in jid:
  1104. arrjid = re.sub("(\[\d+\])", '[]', jid)
  1105. if arrjid in self.cycle.consider:
  1106. self.cycle.consider[jid] = self.cycle.consider[arrjid]
  1107. self.cycle.calendarduration[jid] = \
  1108. (tm - self.cycle.consider[arrjid])
  1109. return PARSER_OK_CONTINUE
  1110. def get_cycles(self, start=None, end=None):
  1111. """
  1112. Get the scheduler cycles
  1113. :param start: Start time
  1114. :param end: End time
  1115. :returns: Scheduling cycles
  1116. """
  1117. if start is None and end is None:
  1118. return self.cycles
  1119. cycles = []
  1120. if end is None:
  1121. end = int(time.time())
  1122. for c in self.cycles:
  1123. if c.start >= start and c.end < end:
  1124. cycles.append(c)
  1125. return cycles
  1126. def comp_analyze(self, rec, start, end):
  1127. if self.estimated_parsing_enabled:
  1128. rv = self.estimated_info_parsing(rec)
  1129. if self.parse_estimated_only:
  1130. return rv
  1131. return self.scheduler_parsing(rec, start, end)
  1132. def scheduler_parsing(self, rec, start, end):
  1133. m = self.tm_tag.match(rec)
  1134. if m:
  1135. tm = self.logutils.convert_date_time(m.group('datetime'))
  1136. self.record_tm.append(tm)
  1137. if self.logutils.in_range(tm, start, end):
  1138. rv = self._parse_line(rec)
  1139. if rv in (PARSER_OK_STOP, PARSER_ERROR_STOP):
  1140. return rv
  1141. if 'pbs_version=' in rec:
  1142. version = rec.split('pbs_version=')[1].strip()
  1143. if version not in self.version:
  1144. self.version.append(version)
  1145. elif end is not None and tm > end:
  1146. PARSER_OK_STOP
  1147. return PARSER_OK_CONTINUE
  1148. def estimated_info_parsing(self, line):
  1149. """
  1150. Parse Estimated start time information for a job
  1151. """
  1152. m = self.sched_job_run_tag.match(line)
  1153. if m is not None:
  1154. jid = str(m.group('jobid'))
  1155. tm = self.logutils.convert_date_time(m.group('datetime'))
  1156. if jid in self.estimated_jobs:
  1157. self.estimated_jobs[jid].started_at = tm
  1158. else:
  1159. ej = JobEstimatedStartTimeInfo(jid)
  1160. ej.started_at = tm
  1161. self.estimated_jobs[jid] = ej
  1162. m = self.estimated_tag.match(line)
  1163. if m is not None:
  1164. jid = str(m.group('jobid'))
  1165. try:
  1166. tm = self.logutils.convert_date_time(m.group('est_tm'),
  1167. "%a %b %d %H:%M:%S %Y")
  1168. except:
  1169. logging.error('error converting time: ' +
  1170. str(m.group('est_tm')))
  1171. return PARSER_ERROR_STOP
  1172. if jid in self.estimated_jobs:
  1173. self.estimated_jobs[jid].add_estimate(tm)
  1174. else:
  1175. ej = JobEstimatedStartTimeInfo(jid)
  1176. ej.add_estimate(tm)
  1177. self.estimated_jobs[jid] = ej
  1178. return PARSER_OK_CONTINUE
  1179. def epilogue(self, line):
  1180. # if log ends in the middle of a cycle there is no 'Leaving cycle'
  1181. # message, in this case the last cycle duration is computed as
  1182. # from start to the last record in the log file
  1183. if self.cycle is not None and self.cycle.end <= 0:
  1184. m = self.record_tag.match(line)
  1185. if m:
  1186. self.cycle.end = self.logutils.convert_date_time(
  1187. m.group('datetime'))
  1188. def summarize_estimated_analysis(self, estimated_jobs=None):
  1189. """
  1190. Summarize estimated job analysis
  1191. """
  1192. if estimated_jobs is None and self.estimated_jobs is not None:
  1193. estimated_jobs = self.estimated_jobs
  1194. einfo = {EJ: []}
  1195. sub15mn = 0
  1196. sub1hr = 0
  1197. sub3hr = 0
  1198. sup3hr = 0
  1199. total_drifters = 0
  1200. total_nondrifters = 0
  1201. drift_times = []
  1202. for e in estimated_jobs.values():
  1203. info = {}
  1204. if len(e.estimated_at) > 0:
  1205. info[JID] = e.jobid
  1206. e_sorted = sorted(e.estimated_at)
  1207. info[Eat] = e.estimated_at
  1208. if e.started_at is not None:
  1209. info[JST] = e.started_at
  1210. e_diff = e_sorted[len(e_sorted) - 1] - e_sorted[0]
  1211. e_accuracy = (e.started_at -
  1212. e.estimated_at[len(e.estimated_at) - 1])
  1213. info[ESTR] = e_diff
  1214. info[ESTA] = e_accuracy
  1215. info[NEST] = e.num_estimates
  1216. info[ND] = e.num_drifts
  1217. info[JDD] = e.drift_time
  1218. drift_times.append(e.drift_time)
  1219. if e.drift_time > 0:
  1220. total_drifters += 1
  1221. if e.drift_time < 15 * 60:
  1222. sub15mn += 1
  1223. elif e.drift_time < 3600:
  1224. sub1hr += 1
  1225. elif e.drift_time < 3 * 3600:
  1226. sub3hr += 1
  1227. else:
  1228. sup3hr += 1
  1229. else:
  1230. total_nondrifters += 1
  1231. einfo[EJ].append(info)
  1232. info = {}
  1233. info[Ds15mn] = sub15mn
  1234. info[Ds1hr] = sub1hr
  1235. info[Ds3hr] = sub3hr
  1236. info[Do3hr] = sup3hr
  1237. info[NJD] = total_drifters
  1238. info[NJND] = total_nondrifters
  1239. if drift_times:
  1240. info[DDm] = min(drift_times)
  1241. info[DDM] = max(drift_times)
  1242. info[DDA] = (sum(drift_times) / len(drift_times))
  1243. info[DD50] = sorted(drift_times)[len(drift_times) / 2]
  1244. einfo[ESTS] = info
  1245. return einfo
  1246. def summary(self, cycles=None, showjobs=False):
  1247. """
  1248. Scheduler log summary
  1249. """
  1250. if self.estimated_parsing_enabled:
  1251. self.info[EST] = self.summarize_estimated_analysis()
  1252. if self.parse_estimated_only:
  1253. return self.info
  1254. if cycles is None and self.cycles is not None:
  1255. cycles = self.cycles
  1256. num_cycle = 0
  1257. run = 0
  1258. failed = 0
  1259. total_considered = 0
  1260. run_tm = []
  1261. cycle_duration = []
  1262. min_duration = None
  1263. max_duration = None
  1264. mint = maxt = None
  1265. calendarduration = 0
  1266. schedsolvertime = 0
  1267. for c in cycles:
  1268. c.summary(showjobs)
  1269. self.info[num_cycle] = c.info
  1270. run += len(c.sched_job_run.keys())
  1271. run_tm.extend(c.sched_job_run.values())
  1272. failed += len(c.run_failure.keys())
  1273. total_considered += c.num_considered
  1274. if max_duration is None or c.duration > max_duration:
  1275. max_duration = c.duration
  1276. maxt = time.strftime("%Y-%m-%d %H:%M:%S",
  1277. time.localtime(c.start))
  1278. if min_duration is None or c.duration < min_duration:
  1279. min_duration = c.duration
  1280. mint = time.strftime("%Y-%m-%d %H:%M:%S",
  1281. time.localtime(c.start))
  1282. cycle_duration.append(c.duration)
  1283. num_cycle += 1
  1284. calendarduration += sum(c.calendarduration.values())
  1285. schedsolvertime += c.scheduler_solver_time
  1286. run_rate = self.logutils.get_rate(sorted(run_tm))
  1287. sorted_cd = sorted(cycle_duration)
  1288. self.summary_info[NC] = len(cycles)
  1289. self.summary_info[NJR] = run
  1290. self.summary_info[NJFR] = failed
  1291. self.summary_info[JRR] = run_rate
  1292. self.summary_info[NJC] = total_considered
  1293. self.summary_info[mCD] = self.logutils._duration(min_duration)
  1294. self.summary_info[MCD] = self.logutils._duration(max_duration)
  1295. self.summary_info[CD25] = self.logutils._duration(
  1296. self.logutils.percentile(sorted_cd, .25))
  1297. if len(sorted_cd) > 0:
  1298. self.summary_info[CDA] = self.logutils._duration(
  1299. sum(sorted_cd) / len(sorted_cd))
  1300. self.summary_info[CD50] = self.logutils._duration(
  1301. self.logutils.percentile(sorted_cd, .5))
  1302. self.summary_info[CD75] = self.logutils._duration(
  1303. self.logutils.percentile(sorted_cd, .75))
  1304. if mint is not None:
  1305. self.summary_info[mCT] = mint
  1306. if maxt is not None:
  1307. self.summary_info[MCT] = maxt
  1308. self.summary_info[DUR] = self.logutils._duration(sum(cycle_duration))
  1309. self.summary_info[TTC] = self.logutils._duration(calendarduration)
  1310. self.summary_info[SST] = self.logutils._duration(schedsolvertime)
  1311. self.summary_info[VER] = ",".join(self.version)
  1312. self.info['summary'] = dict(self.summary_info.items())
  1313. return self.info
  1314. class PBSCycleInfo(object):
  1315. def __init__(self):
  1316. self.info = {}
  1317. """
  1318. Time between end and start of a cycle, which may be on alarm,
  1319. or signal, not only Leaving - Starting
  1320. """
  1321. self.duration = 0
  1322. " Time of a Starting scheduling cycle message "
  1323. self.start = 0
  1324. " Time of a Leaving scheduling cycle message "
  1325. self.end = 0
  1326. " Time at which Considering job to run message "
  1327. self.consider = {}
  1328. " Number of jobs considered "
  1329. self.num_considered = 0
  1330. " Time at which job run message in scheduler. This includes time to "
  1331. " start the job by the server "
  1332. self.sched_job_run = {}
  1333. """
  1334. number of jobs added to the calendar, i.e.,
  1335. number of backfilling jobs
  1336. """
  1337. self.calendared_jobs = {}
  1338. " Time between Considering job to run to Job run message "
  1339. self.runduration = {}
  1340. " Time to determine that job couldn't run "
  1341. self.cantrunduration = {}
  1342. " List of jobs preempted in order to run high priority job"
  1343. self.preempted_jobs = {}
  1344. """
  1345. Time between considering job to run to server logging
  1346. 'Job Run at request...
  1347. """
  1348. self.inschedduration = {}
  1349. " Total time spent in scheduler solver, insched + cantrun + calendar"
  1350. self.scheduler_solver_time = 0
  1351. " Error 15XXX in the sched log corresponds to a failure to run"
  1352. self.run_failure = {}
  1353. " Job failed to be preempted"
  1354. self.num_preempt_failure = 0
  1355. " Job preempted by "
  1356. self.num_preempted = 0
  1357. " Time between start of cycle and first job considered to run "
  1358. self.queryduration = 0
  1359. " The order in which jobs are considered "
  1360. self.political_order = []
  1361. " Time to calendar "
  1362. self.calendarduration = {}
  1363. self.lastjob = None
  1364. def summary(self, showjobs=False):
  1365. """
  1366. Summary regarding cycle
  1367. """
  1368. self.info[CST] = time.strftime(
  1369. "%Y-%m-%d %H:%M:%S", time.localtime(self.start))
  1370. self.info[CD] = PBSLogUtils._duration(self.end - self.start)
  1371. self.info[QD] = PBSLogUtils._duration(self.queryduration)
  1372. # number of jobs considered may be different than length of
  1373. # the consider dictionary due to job arrays being considered once
  1374. # per subjob using the parent array job id
  1375. self.info[NJC] = self.num_considered
  1376. self.info[NJR] = len(self.sched_job_run.keys())
  1377. self.info[NJFR] = len(self.run_failure)
  1378. self.scheduler_solver_time = (sum(self.inschedduration.values()) +
  1379. sum(self.cantrunduration.values()) +
  1380. sum(self.calendarduration.values()))
  1381. self.info[SST] = self.scheduler_solver_time
  1382. self.info[NJCAL] = len(self.calendared_jobs.keys())
  1383. self.info[NJFP] = self.num_preempt_failure
  1384. self.info[NJP] = self.num_preempted
  1385. self.info[TTC] = sum(self.calendarduration.values())
  1386. if showjobs:
  1387. for j in self.consider.keys():
  1388. s = {JID: j}
  1389. if j in self.runduration:
  1390. s[T2R] = self.runduration[j]
  1391. if j in self.cantrunduration:
  1392. s[T2D] = self.cantrunduration[j]
  1393. if j in self.inschedduration:
  1394. s[TiS] = self.inschedduration[j]
  1395. if j in self.calendarduration:
  1396. s[TTC] = self.calendarduration[j]
  1397. if 'jobs' in self.info:
  1398. self.info['jobs'].append(s)
  1399. else:
  1400. self.info['jobs'] = [s]
  1401. class PBSMoMLog(PBSLogAnalyzer):
  1402. """
  1403. Container and Parser of a PBS ``MoM`` log
  1404. """
  1405. tm_tag = re.compile(tm_re)
  1406. mom_run_tag = re.compile(tm_re + ".*" + job_re + ".*;Started, pid.*")
  1407. mom_end_tag = re.compile(tm_re + ".*" + job_re +
  1408. ".*;delete job request received.*")
  1409. mom_enquejob_tag = re.compile(tm_re + ".*;Type 5 .*")
  1410. def __init__(self, filename=None, hostname=None, show_progress=False):
  1411. self.filename = filename
  1412. self.hostname = hostname
  1413. self.show_progress = show_progress
  1414. self.start = []
  1415. self.end = []
  1416. self.queued = []
  1417. self.info = {}
  1418. self.version = []
  1419. def comp_analyze(self, rec, start, end):
  1420. m = self.mom_run_tag.match(rec)
  1421. if m:
  1422. tm = self.logutils.convert_date_time(m.group('datetime'))
  1423. if ((start is None and end is None) or
  1424. self.logutils.in_range(tm, start, end)):
  1425. self.start.append(tm)
  1426. return PARSER_OK_CONTINUE
  1427. elif end is not None and tm > end:
  1428. return PARSER_OK_STOP
  1429. m = self.mom_end_tag.match(rec)
  1430. if m:
  1431. tm = self.logutils.convert_date_time(m.group('datetime'))
  1432. if ((start is None and end is None) or
  1433. self.logutils.in_range(tm, start, end)):
  1434. self.end.append(tm)
  1435. return PARSER_OK_CONTINUE
  1436. elif end is not None and tm > end:
  1437. return PARSER_OK_STOP
  1438. m = self.mom_enquejob_tag.match(rec)
  1439. if m:
  1440. tm = self.logutils.convert_date_time(m.group('datetime'))
  1441. if ((start is None and end is None) or
  1442. self.logutils.in_range(tm, start, end)):
  1443. self.queued.append(tm)
  1444. return PARSER_OK_CONTINUE
  1445. elif end is not None and tm > end:
  1446. return PARSER_OK_STOP
  1447. if 'pbs_version=' in rec:
  1448. version = rec.split('pbs_version=')[1].strip()
  1449. if version not in self.version:
  1450. self.version.append(version)
  1451. return PARSER_OK_CONTINUE
  1452. def summary(self):
  1453. """
  1454. Mom log summary
  1455. """
  1456. run_rate = self.logutils.get_rate(self.start)
  1457. queue_rate = self.logutils.get_rate(self.queued)
  1458. end_rate = self.logutils.get_rate(self.end)
  1459. self.info[NJQ] = len(self.queued)
  1460. self.info[NJR] = len(self.start)
  1461. self.info[NJE] = len(self.end)
  1462. self.info[JRR] = run_rate
  1463. self.info[JSR] = queue_rate
  1464. self.info[JER] = end_rate
  1465. self.info[VER] = ",".join(self.version)
  1466. return self.info
  1467. class PBSAccountingLog(PBSLogAnalyzer):
  1468. """
  1469. Container and Parser of a PBS accounting log
  1470. """
  1471. tm_tag = re.compile(tm_re)
  1472. record_tag = re.compile(r"""
  1473. (?P<date>\d\d/\d\d/\d{4,4})[\s]+
  1474. (?P<time>\d\d:\d\d:\d\d);
  1475. (?P<type>[A-Z]);
  1476. (?P<id>[0-9\[\]].*);
  1477. (?P<msg>.*)
  1478. """, re.VERBOSE)
  1479. S_sub_record_tag = re.compile(r"""
  1480. .*user=(?P<user>[\w\d]+)[\s]+
  1481. .*qtime=(?P<qtime>[0-9]+)[\s]+
  1482. .*start=(?P<start>[0-9]+)[\s]+
  1483. .*exec_host=(?P<exechost>[\[\],\-\=\/\.\w/*\d\+]+)[\s]+
  1484. .*Resource_List.ncpus=(?P<ncpus>[0-9]+)[\s]+
  1485. .*
  1486. """, re.VERBOSE)
  1487. E_sub_record_tag = re.compile(r"""
  1488. .*user=(?P<user>[\w\d]+)[\s]+
  1489. .*qtime=(?P<qtime>[0-9]+)[\s]+
  1490. .*start=(?P<start>[0-9]+)[\s]+
  1491. .*exec_host=(?P<exechost>[\[\],\-\=\/\.\w/*\d\+]+)[\s]+
  1492. .*Resource_List.ncpus=(?P<ncpus>[0-9]+)[\s]+
  1493. .*resources_used.walltime=(?P<walltime>[0-9:]+)
  1494. .*
  1495. """, re.VERBOSE)
  1496. __E_sub_record_tag = re.compile(r"""
  1497. .*user=(?P<user>[\w\d]+)[\s]+
  1498. .*qtime=(?P<qtime>[0-9]+)[\s]+
  1499. .*start=(?P<start>[0-9]+)[\s]+
  1500. .*exec_host=(?P<exechost>[\[\],\-\=\/\.\w/*\d\+]+)[\s]+
  1501. .*Resource_List.ncpus=(?P<ncpus>[0-9]+)[\s]+
  1502. .*resources_used.walltime=(?P<walltime>[0-9:]+)
  1503. .*
  1504. """, re.VERBOSE)
  1505. sub_record_tag = re.compile(r"""
  1506. .*qtime=(?P<qtime>[0-9]+)[\s]+
  1507. .*start=(?P<start>[0-9]+)[\s]+
  1508. .*exec_host=(?P<exechost>[\[\],\-\=\/\.\w/*\d\+]+)[\s]+
  1509. .*exec_vnode=(?P<execvnode>[\(\)\[\],:\-\=\/\.\w/*\d\+]+)[\s]+
  1510. .*Resource_List.ncpus=(?P<ncpus>[\d]+)[\s]+
  1511. .*
  1512. """, re.VERBOSE)
  1513. logger = logging.getLogger(__name__)
  1514. utils = BatchUtils()
  1515. def __init__(self, filename=None, hostname=None, show_progress=False):
  1516. self.filename = filename
  1517. self.hostname = hostname
  1518. self.show_progress = show_progress
  1519. self.record_tm = []
  1520. self.entries = {}
  1521. self.queue = []
  1522. self.start = []
  1523. self.end = []
  1524. self.wait_time = []
  1525. self.run_time = []
  1526. self.job_node_size = []
  1527. self.job_cpu_size = []
  1528. self.used_cph = 0
  1529. self.nodes_cph = 0
  1530. self.used_nph = 0
  1531. self.jobs_started = []
  1532. self.jobs_ended = []
  1533. self.users = {}
  1534. self.tmp_wait_time = {}
  1535. self.duration = 0
  1536. self.utilization_parsing = False
  1537. self.running_jobs_parsing = False
  1538. self.job_info_parsing = False
  1539. self.accounting_workload_parsing = False
  1540. self._total_ncpus = 0
  1541. self._num_nodes = 0
  1542. self._running_jobids = []
  1543. self._server = None
  1544. self.running_jobs = {}
  1545. self.job_start = {}
  1546. self.job_end = {}
  1547. self.job_nodes = {}
  1548. self.job_cpus = {}
  1549. self.job_rectypes = {}
  1550. self.job_attrs = {}
  1551. self.parser_errors = 0
  1552. self.info = {}
  1553. def enable_running_jobs_parsing(self):
  1554. """
  1555. Enable parsing for running jobs
  1556. """
  1557. self.running_jobs_parsing = True
  1558. def enable_utilization_parsing(self, hostname=None, nodesfile=None,
  1559. jobsfile=None):
  1560. """
  1561. Enable utilization parsing
  1562. :param hostname: Hostname of the machine
  1563. :type hostname: str or None
  1564. :param nodesfile: optional file containing output of
  1565. pbsnodes -av
  1566. :type nodesfile: str or None
  1567. :param jobsfile: optional file containing output of
  1568. qstat -f
  1569. :type jobsfile: str or None
  1570. """
  1571. self.utilization_parsing = True
  1572. self.process_nodes_data(hostname, nodesfile, jobsfile)
  1573. def enable_job_info_parsing(self):
  1574. """
  1575. Enable job information parsing
  1576. """
  1577. self.job_info_res = {}
  1578. self.job_info_parsing = True
  1579. def enable_accounting_workload_parsing(self):
  1580. """
  1581. Enable accounting workload parsing
  1582. """
  1583. self.accounting_workload_parsing = True
  1584. def process_nodes_data(self, hostname=None, nodesfile=None, jobsfile=None):
  1585. """
  1586. Get job and node information by stat'ing and parsing node
  1587. data from the server.
  1588. Compute the number of nodes and populate a list of running
  1589. job ids on those nodes.
  1590. :param hostname: The host to query
  1591. :type hostname: str or None
  1592. :param nodesfile: optional file containing output of
  1593. pbsnodes -av
  1594. :type nodesfile: str or None
  1595. :param jobsfile: optional file containing output of
  1596. qstat -f
  1597. :type jobsfile: str or None
  1598. The node data is needed to compute counts of nodes and cpus
  1599. The job data is needed to compute the amount of resources
  1600. requested
  1601. """
  1602. if nodesfile or jobsfile:
  1603. self._server = Server(diagmap={NODE: nodesfile, JOB: jobsfile})
  1604. else:
  1605. self._server = Server(hostname)
  1606. ncpus = self._server.counter(NODE, 'resources_available.ncpus',
  1607. grandtotal=True, level=logging.DEBUG)
  1608. if 'resources_available.ncpus' in ncpus:
  1609. self._total_ncpus = ncpus['resources_available.ncpus']
  1610. self._num_nodes = len(self._server.status(NODE))
  1611. jobs = self._server.status(NODE, 'jobs')
  1612. running_jobids = []
  1613. for cur_job in jobs:
  1614. if 'jobs' not in cur_job:
  1615. continue
  1616. job = cur_job['jobs']
  1617. jlist = job.split(',')
  1618. for j in jlist:
  1619. running_jobids.append(j.split('/')[0].strip())
  1620. self._running_jobids = list(set(running_jobids))
  1621. def comp_analyze(self, rec, start, end, **kwargs):
  1622. if self.job_info_parsing:
  1623. return self.job_info(rec)
  1624. else:
  1625. return self.accounting_parsing(rec, start, end)
  1626. def accounting_parsing(self, rec, start, end):
  1627. """
  1628. Parsing accounting log
  1629. """
  1630. r = self.record_tag.match(rec)
  1631. if not r:
  1632. return PARSER_ERROR_CONTINUE
  1633. tm = self.logutils.convert_date_time(r.group('date') +
  1634. ' ' + r.group('time'))
  1635. if ((start is None and end is None) or
  1636. self.logutils.in_range(tm, start, end)):
  1637. self.record_tm.append(tm)
  1638. rec_type = r.group('type')
  1639. jobid = r.group('id')
  1640. if not self.accounting_workload_parsing and rec_type == 'S':
  1641. # Precompute metrics about the S record just in case
  1642. # it does not have an E record. The differences are
  1643. # resolved after all records are processed
  1644. if jobid in self._running_jobids:
  1645. self._running_jobids.remove(jobid)
  1646. m = self.S_sub_record_tag.match(r.group('msg'))
  1647. if m:
  1648. self.users[jobid] = m.group('user')
  1649. qtime = int(m.group('qtime'))
  1650. starttime = int(m.group('start'))
  1651. ncpus = int(m.group('ncpus'))
  1652. self.job_cpus[jobid] = ncpus
  1653. if starttime != 0 and qtime != 0:
  1654. self.tmp_wait_time[jobid] = starttime - qtime
  1655. self.job_start[jobid] = starttime
  1656. ehost = m.group('exechost')
  1657. self.job_nodes[jobid] = ResourceResv.get_hosts(ehost)
  1658. elif rec_type == 'E':
  1659. if self.accounting_workload_parsing:
  1660. try:
  1661. msg = r.group('msg').split()
  1662. attrs = dict([l.split('=', 1) for l in msg])
  1663. except:
  1664. self.parser_errors += 1
  1665. return PARSER_OK_CONTINUE
  1666. for k in attrs.keys():
  1667. attrs[k] = self.utils.decode_value(attrs[k])
  1668. running_time = (int(attrs['end']) - int(attrs['start']))
  1669. attrs['running_time'] = str(running_time)
  1670. attrs['schedselect'] = attrs['Resource_List.select']
  1671. if 'euser' not in attrs:
  1672. attrs['euser'] = 'unknown_user'
  1673. attrs['id'] = r.group('id')
  1674. self.job_attrs[r.group('id')] = attrs
  1675. m = self.E_sub_record_tag.match(r.group('msg'))
  1676. if m:
  1677. if jobid not in self.users:
  1678. self.users[jobid] = m.group('user')
  1679. ehost = m.group('exechost')
  1680. self.job_nodes[jobid] = ResourceResv.get_hosts(ehost)
  1681. ncpus = int(m.group('ncpus'))
  1682. self.job_cpus[jobid] = ncpus
  1683. self.job_end[jobid] = tm
  1684. qtime = int(m.group('qtime'))
  1685. starttime = int(m.group('start'))
  1686. if starttime != 0 and qtime != 0:
  1687. # jobs enqueued prior to start of time range
  1688. # considered should be reset to start of time
  1689. # range. Only matters when computing
  1690. # utilization
  1691. if (self.utilization_parsing and
  1692. qtime < self.record_tm[0]):
  1693. qtime = self.record_tm[0]
  1694. if starttime < self.record_tm[0]:
  1695. starttime = self.record_tm[0]
  1696. self.wait_time.append(starttime - qtime)
  1697. if m.group('walltime'):
  1698. try:
  1699. walltime = self.logutils.convert_hhmmss_time(
  1700. m.group('walltime').strip())
  1701. self.run_time.append(walltime)
  1702. except:
  1703. pass
  1704. else:
  1705. walltime = tm - starttime
  1706. self.run_time.append(walltime)
  1707. if self.utilization_parsing:
  1708. self.used_cph += ncpus * (walltime / 60)
  1709. if self.utils:
  1710. self.used_nph += (len(self.job_nodes[jobid]) *
  1711. (walltime / 60))
  1712. elif rec_type == 'Q':
  1713. self.queue.append(tm)
  1714. elif rec_type == 'D':
  1715. if jobid not in self.job_end:
  1716. self.job_end[jobid] = tm
  1717. elif end is not None and tm > end:
  1718. return PARSER_OK_STOP
  1719. return PARSER_OK_CONTINUE
  1720. def epilogue(self, line):
  1721. if self.running_jobs_parsing or self.accounting_workload_parsing:
  1722. return
  1723. if len(self.record_tm) > 0:
  1724. last_record_tm = self.record_tm[len(self.record_tm) - 1]
  1725. self.duration = last_record_tm - self.record_tm[0]
  1726. self.info[DUR] = self.logutils._duration(self.duration)
  1727. self.jobs_started = self.job_start.keys()
  1728. self.jobs_ended = self.job_end.keys()
  1729. self.job_node_size = map(lambda n: len(n), self.job_nodes.values())
  1730. self.job_cpu_size = self.job_cpus.values()
  1731. self.start = sorted(self.job_start.values())
  1732. self.end = sorted(self.job_end.values())
  1733. # list of jobs that have not yet ended, those are jobs that
  1734. # have an S record but no E record. We port back the precomputed
  1735. # metrics from the S record into the data to "publish"
  1736. sjobs = list(set(self.jobs_started).difference(self.jobs_ended))
  1737. for job in sjobs:
  1738. if job in self.tmp_wait_time:
  1739. self.wait_time.append(self.tmp_wait_time[job])
  1740. if job in self.job_nodes:
  1741. self.job_node_size.append(len(self.job_nodes[job]))
  1742. if job in self.job_cpus:
  1743. self.job_cpu_size.append(self.job_cpus[job])
  1744. if self.utilization_parsing:
  1745. if job in self.job_start:
  1746. if job in self.job_cpus:
  1747. self.used_cph += self.job_cpus[job] * \
  1748. ((last_record_tm - self.job_start[
  1749. job]) / 60)
  1750. if job in self.job_nodes:
  1751. self.used_nph += len(self.job_nodes[job]) * \
  1752. ((last_record_tm - self.job_start[
  1753. job]) / 60)
  1754. # Process jobs currently running, those may have an S record
  1755. # that is older than the time window considered or not.
  1756. # If they have an S record, then they were already processed
  1757. # by the S record routine, otherwise, they are processed here
  1758. if self.utilization_parsing:
  1759. first_record_tm = self.record_tm[0]
  1760. a = {'job_state': (EQ, 'R'),
  1761. 'Resource_List.ncpus': (SET, ''),
  1762. 'exec_host': (SET, ''),
  1763. 'stime': (SET, '')}
  1764. alljobs = self._server.status(JOB, a)
  1765. for job in alljobs:
  1766. # the running_jobids is populated from the node's jobs
  1767. # attribute. If a job id is not in the running jobids
  1768. # list, then its S record was already processed
  1769. if job['id'] not in self._running_jobids:
  1770. continue
  1771. if ('job_state' not in job or
  1772. 'Resource_List.ncpus' not in job or
  1773. 'exec_host' not in job or 'stime' not in job):
  1774. continue
  1775. # split to catch a customer tweak
  1776. stime = int(job['stime'].split()[0])
  1777. if stime < first_record_tm:
  1778. stime = first_record_tm
  1779. self.used_cph += int(job['Resource_List.ncpus']) * \
  1780. (last_record_tm - stime)
  1781. nodes = len(self.utils.parse_exechost(
  1782. job['exec_host']))
  1783. self.used_nph += nodes * (last_record_tm - stime)
  1784. def job_info(self, rec):
  1785. """
  1786. PBS Job information
  1787. """
  1788. m = self.record_tag.match(rec)
  1789. if m:
  1790. d = {}
  1791. if m.group('type') == 'E':
  1792. if hasattr(self, 'jobid') and self.jobid != m.group('id'):
  1793. return PARSER_OK_CONTINUE
  1794. if not hasattr(self, 'job_info_res'):
  1795. self.job_info_res = {}
  1796. for a in m.group('msg').split():
  1797. (k, v) = a.split('=', 1)
  1798. d[k] = v
  1799. self.job_info_res[m.group('id')] = d
  1800. return PARSER_OK_CONTINUE
  1801. def finished_jobs_nodes(self, last=None, tm_range=None):
  1802. """
  1803. :param tm_range: a tuple of time where the first item is the
  1804. start time
  1805. :param last: If tm_range is None and last is specified, a
  1806. time range from now till 'last' seconds from
  1807. now is used as a range of time to consider and
  1808. the second item is the end time to consider
  1809. :returns: A dictionary of jobs that ended in the time range
  1810. as keys,and nodes (hostnames) on which those jobs
  1811. were running as values.
  1812. """
  1813. if self.filename is None:
  1814. self.logger.error('A filename is required, exiting')
  1815. return
  1816. if tm_range is None and last is not None:
  1817. tm_range = (time.time(), time.time() - last)
  1818. if len(tm_range) != 2:
  1819. self.logger.error(
  1820. 'tm_range must be a tuple of start and end times')
  1821. return
  1822. job_nodes = {}
  1823. f = FileUtils(self.filename, FILE_TAIL)
  1824. # the file is being tailed so we look at the start and end record
  1825. # in the 'opposite' range
  1826. start = tm_range[1]
  1827. end = tm_range[0]
  1828. while True:
  1829. records = f.next()
  1830. if records is None:
  1831. break
  1832. for rec in records:
  1833. r = self.record_tag.match(rec)
  1834. if r:
  1835. tm = self.logutils.convert_date_time(r.group('date') +
  1836. ' ' + r.group('time'))
  1837. if not self.logutils.in_range(tm, start, end):
  1838. continue
  1839. rec_type = r.group('type')
  1840. if rec_type == 'E':
  1841. jobid = r.group('id')
  1842. m = self.sub_record_tag.match(r.group('msg'))
  1843. if m and self.utils:
  1844. ehost = m.group('exechost')
  1845. nodes = ResourceResv.get_hosts(ehost)
  1846. job_nodes[jobid] = nodes
  1847. return job_nodes
  1848. def summary(self):
  1849. """
  1850. Accounting log summary
  1851. """
  1852. if self.running_jobs_parsing or self.accounting_workload_parsing:
  1853. return
  1854. run_rate = self.logutils.get_rate(self.start)
  1855. queue_rate = self.logutils.get_rate(self.queue)
  1856. end_rate = self.logutils.get_rate(self.end)
  1857. self.info[NJQ] = len(self.queue)
  1858. self.info[NJR] = len(self.start)
  1859. self.info[NJE] = len(self.end)
  1860. self.info[JRR] = run_rate
  1861. self.info[JSR] = queue_rate
  1862. self.info[JER] = end_rate
  1863. if len(self.wait_time) > 0:
  1864. wt = sorted(self.wait_time)
  1865. wta = float(sum(self.wait_time)) / len(self.wait_time)
  1866. self.info[JWTm] = self.logutils._duration(min(wt))
  1867. self.info[JWTM] = self.logutils._duration(max(wt))
  1868. self.info[JWTA] = self.logutils._duration(wta)
  1869. self.info[JWT25] = self.logutils._duration(
  1870. self.logutils.percentile(wt, .25))
  1871. self.info[JWT50] = self.logutils._duration(
  1872. self.logutils.percentile(wt, .5))
  1873. self.info[JWT75] = self.logutils._duration(
  1874. self.logutils.percentile(wt, .75))
  1875. if len(self.run_time) > 0:
  1876. rt = sorted(self.run_time)
  1877. self.info[JRTm] = self.logutils._duration(min(rt))
  1878. self.info[JRT25] = self.logutils._duration(
  1879. self.logutils.percentile(rt, 0.25))
  1880. self.info[JRT50] = self.logutils._duration(
  1881. self.logutils.percentile(rt, 0.50))
  1882. self.info[JRTA] = self.logutils._duration(
  1883. str(sum(rt) / len(rt)))
  1884. self.info[JRT75] = self.logutils._duration(
  1885. self.logutils.percentile(rt, 0.75))
  1886. self.info[JRTM] = self.logutils._duration(max(rt))
  1887. if len(self.job_node_size) > 0:
  1888. js = sorted(self.job_node_size)
  1889. self.info[JNSm] = min(js)
  1890. self.info[JNS25] = self.logutils.percentile(js, 0.25)
  1891. self.info[JNS50] = self.logutils.percentile(js, 0.50)
  1892. self.info[JNSA] = str("%.2f" % (float(sum(js)) / len(js)))
  1893. self.info[JNS75] = self.logutils.percentile(js, 0.75)
  1894. self.info[JNSM] = max(js)
  1895. if len(self.job_cpu_size) > 0:
  1896. js = sorted(self.job_cpu_size)
  1897. self.info[JCSm] = min(js)
  1898. self.info[JCS25] = self.logutils.percentile(js, 0.25)
  1899. self.info[JCS50] = self.logutils.percentile(js, 0.50)
  1900. self.info[JCSA] = str("%.2f" % (float(sum(js)) / len(js)))
  1901. self.info[JCS75] = self.logutils.percentile(js, 0.75)
  1902. self.info[JCSM] = max(js)
  1903. if self.utilization_parsing:
  1904. ncph = self._total_ncpus * self.duration
  1905. nph = self._num_nodes * self.duration
  1906. if ncph > 0:
  1907. self.info[UNCPUS] = str("%.2f" %
  1908. (100 * float(self.used_cph) / ncph) +
  1909. '%')
  1910. if nph > 0:
  1911. self.info[UNODES] = str("%.2f" %
  1912. (100 * float(self.used_nph) / nph) +
  1913. '%')
  1914. self.info[CPH] = self.used_cph
  1915. self.info[NPH] = self.used_nph
  1916. self.info[USRS] = len(set(self.users.values()))
  1917. return self.info