pbs_accumulate_resc_used.py 49 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331
  1. # coding: utf-8
  2. # Copyright (C) 1994-2018 Altair Engineering, Inc.
  3. # For more information, contact Altair at www.altair.com.
  4. #
  5. # This file is part of the PBS Professional ("PBS Pro") software.
  6. #
  7. # Open Source License Information:
  8. #
  9. # PBS Pro is free software. You can redistribute it and/or modify it under the
  10. # terms of the GNU Affero General Public License as published by the Free
  11. # Software Foundation, either version 3 of the License, or (at your option) any
  12. # later version.
  13. #
  14. # PBS Pro is distributed in the hope that it will be useful, but WITHOUT ANY
  15. # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
  16. # FOR A PARTICULAR PURPOSE.
  17. # See the GNU Affero General Public License for more details.
  18. #
  19. # You should have received a copy of the GNU Affero General Public License
  20. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  21. #
  22. # Commercial License Information:
  23. #
  24. # For a copy of the commercial license terms and conditions,
  25. # go to: (http://www.pbspro.com/UserArea/agreement.html)
  26. # or contact the Altair Legal Department.
  27. #
  28. # Altair’s dual-license business model allows companies, individuals, and
  29. # organizations to create proprietary derivative works of PBS Pro and
  30. # distribute them - whether embedded or bundled with other software -
  31. # under a commercial license agreement.
  32. #
  33. # Use of Altair’s trademarks, including but not limited to "PBS™",
  34. # "PBS Professional®", and "PBS Pro™" and Altair’s logos is subject to Altair's
  35. # trademark licensing policies.
  36. from tests.functional import *
  37. class TestPbsAccumulateRescUsed(TestFunctional):
  38. """
  39. This tests the feature in PBS that enables mom hooks to accumulate
  40. resources_used values for resources beside cput, cpupercent, and mem.
  41. This includes accumulation of custom resources. The mom hooks supported
  42. this feature are: exechost_periodic, execjob_prologue,
  43. and execjob_epilogue.
  44. PRE: Have a cluster of PBS with 3 mom hosts, with an exechost_startup
  45. that adds custom resources.
  46. POST: When a job ends, accounting_logs reflect the aggregated
  47. resources_used values. And with job_history_enable=true, one
  48. can do a 'qstat -x -f <jobid>' to obtain information of a previous
  49. job.
  50. """
  51. # Class variables
  52. def setUp(self):
  53. TestFunctional.setUp(self)
  54. self.logger.info("len moms = %d" % (len(self.moms)))
  55. if len(self.moms) != 3:
  56. usage_string = 'test requires 3 MoMs as input, ' + \
  57. 'use -p moms=<mom1>:<mom2>:<mom3>'
  58. self.skip_test(usage_string)
  59. # PBSTestSuite returns the moms passed in as parameters as dictionary
  60. # of hostname and MoM object
  61. self.momA = self.moms.values()[0]
  62. self.momB = self.moms.values()[1]
  63. self.momC = self.moms.values()[2]
  64. self.momA.delete_vnode_defs()
  65. self.momB.delete_vnode_defs()
  66. self.momC.delete_vnode_defs()
  67. self.hostA = self.momA.shortname
  68. self.hostB = self.momB.shortname
  69. self.hostC = self.momC.shortname
  70. rc = self.server.manager(MGR_CMD_DELETE, NODE, None, "")
  71. self.assertEqual(rc, 0)
  72. rc = self.server.manager(MGR_CMD_CREATE, NODE, id=self.hostA)
  73. self.assertEqual(rc, 0)
  74. rc = self.server.manager(MGR_CMD_CREATE, NODE, id=self.hostB)
  75. self.assertEqual(rc, 0)
  76. rc = self.server.manager(MGR_CMD_CREATE, NODE, id=self.hostC)
  77. self.assertEqual(rc, 0)
  78. # Give the moms a chance to contact the server.
  79. self.server.expect(NODE, {'state': 'free'}, id=self.hostA)
  80. self.server.expect(NODE, {'state': 'free'}, id=self.hostB)
  81. self.server.expect(NODE, {'state': 'free'}, id=self.hostC)
  82. # First set some custom resources via exechost_startup hook.
  83. startup_hook_body = """
  84. import pbs
  85. e=pbs.event()
  86. localnode=pbs.get_local_nodename()
  87. e.vnode_list[localnode].resources_available['foo_i'] = 7
  88. e.vnode_list[localnode].resources_available['foo_f'] = 5.0
  89. e.vnode_list[localnode].resources_available['foo_str'] = "seventyseven"
  90. """
  91. hook_name = "start"
  92. a = {'event': "exechost_startup", 'enabled': 'True'}
  93. rv = self.server.create_import_hook(
  94. hook_name,
  95. a,
  96. startup_hook_body,
  97. overwrite=True)
  98. self.assertTrue(rv)
  99. self.momA.signal("-HUP")
  100. self.momB.signal("-HUP")
  101. self.momC.signal("-HUP")
  102. a = {'job_history_enable': 'True'}
  103. self.server.manager(MGR_CMD_SET, SERVER, a)
  104. # Next set some custom resources via qmgr -c 'create resource'
  105. attr = {}
  106. attr['type'] = 'string'
  107. attr['flag'] = 'h'
  108. r = 'foo_str2'
  109. rc = self.server.manager(
  110. MGR_CMD_CREATE, RSC, attr, id=r, runas=ROOT_USER, logerr=False)
  111. self.assertEqual(rc, 0)
  112. # Ensure the new resource is seen by all moms.
  113. self.momA.log_match(
  114. "resourcedef;copy hook-related file", max_attempts=3)
  115. self.momB.log_match(
  116. "resourcedef;copy hook-related file", max_attempts=3)
  117. self.momC.log_match(
  118. "resourcedef;copy hook-related file", max_attempts=3)
  119. attr['type'] = 'string'
  120. attr['flag'] = 'h'
  121. r = 'foo_str3'
  122. rc = self.server.manager(
  123. MGR_CMD_CREATE, RSC, attr, id=r, runas=ROOT_USER, logerr=False)
  124. self.assertEqual(rc, 0)
  125. # Ensure the new resource is seen by all moms.
  126. self.momA.log_match("resourcedef;copy hook-related file")
  127. self.momB.log_match("resourcedef;copy hook-related file")
  128. self.momC.log_match("resourcedef;copy hook-related file")
  129. attr['type'] = 'string'
  130. attr['flag'] = 'h'
  131. r = 'foo_str4'
  132. rc = self.server.manager(
  133. MGR_CMD_CREATE, RSC, attr, id=r, runas=ROOT_USER, logerr=False)
  134. self.assertEqual(rc, 0)
  135. # Ensure the new resource is seen by all moms.
  136. self.momA.log_match("resourcedef;copy hook-related file")
  137. self.momB.log_match("resourcedef;copy hook-related file")
  138. self.momC.log_match("resourcedef;copy hook-related file")
  139. attr['type'] = 'string_array'
  140. attr['flag'] = 'h'
  141. r = 'stra'
  142. rc = self.server.manager(
  143. MGR_CMD_CREATE, RSC, attr, id=r, runas=ROOT_USER, logerr=False)
  144. self.assertEqual(rc, 0)
  145. # Give the moms a chance to receive the updated resource.
  146. # Ensure the new resource is seen by all moms.
  147. self.momA.log_match("resourcedef;copy hook-related file")
  148. self.momB.log_match("resourcedef;copy hook-related file")
  149. self.momC.log_match("resourcedef;copy hook-related file")
  150. def test_epilogue(self):
  151. """
  152. Test accumulatinon of resources of a multinode job from an
  153. exechost_epilogue hook.
  154. """
  155. self.logger.info("test_epilogue")
  156. hook_body = """
  157. import pbs
  158. e=pbs.event()
  159. pbs.logmsg(pbs.LOG_DEBUG, "executed epilogue hook")
  160. if e.job.in_ms_mom():
  161. e.job.resources_used["vmem"] = pbs.size("9gb")
  162. e.job.resources_used["foo_i"] = 9
  163. e.job.resources_used["foo_f"] = 0.09
  164. e.job.resources_used["foo_str"] = '{"seven":7}'
  165. e.job.resources_used["cput"] = 10
  166. e.job.resources_used["stra"] = '"glad,elated","happy"'
  167. e.job.resources_used["foo_str3"] = \
  168. \"\"\"{"a":6,"b":"some value #$%^&*@","c":54.4,"d":"32.5gb"}\"\"\"
  169. e.job.resources_used["foo_str2"] = "seven"
  170. e.job.resources_used["foo_str4"] = "eight"
  171. else:
  172. e.job.resources_used["vmem"] = pbs.size("10gb")
  173. e.job.resources_used["foo_i"] = 10
  174. e.job.resources_used["foo_f"] = 0.10
  175. e.job.resources_used["foo_str"] = '{"eight":8,"nine":9}'
  176. e.job.resources_used["foo_str2"] = '{"seven":7}'
  177. e.job.resources_used["cput"] = 20
  178. e.job.resources_used["stra"] = '"cucumbers,bananas"'
  179. e.job.resources_used["foo_str3"] = \"\"\""vn1":4,"vn2":5,"vn3":6\"\"\"
  180. """
  181. hook_name = "epi"
  182. a = {'event': "execjob_epilogue", 'enabled': 'True'}
  183. rv = self.server.create_import_hook(
  184. hook_name,
  185. a,
  186. hook_body,
  187. overwrite=True)
  188. self.assertTrue(rv)
  189. a = {'Resource_List.select': '3:ncpus=1',
  190. 'Resource_List.walltime': 10,
  191. 'Resource_List.place': "scatter"}
  192. j = Job(TEST_USER)
  193. j.set_attributes(a)
  194. j.set_sleep_time("10")
  195. jid = self.server.submit(j)
  196. # The results should show results for custom resources 'foo_i',
  197. # 'foo_f', 'foo_str', 'foo_str3', and bultin resources 'vmem',
  198. # 'cput', and should be accumulating based
  199. # on the hook script, where MS defines 1 value, while the 2 sister
  200. # Moms define the same value. For 'string' type, it will be a
  201. # union of all values obtained from sister moms and local mom, and
  202. # the result will be in JSON-format.
  203. #
  204. # foo_str is for testing normal values.
  205. # foo_str2 is for testing non-JSON format value received from MS.
  206. # foo_str3 is for testing non-JSON format value received from a sister
  207. # mom.
  208. # foo_str4 is for testing MS-only set values.
  209. #
  210. # For string_array type resource 'stra', it is not accumulated but
  211. # will be set to last seen value from a mom epilogue hook.
  212. self.server.expect(JOB, {
  213. 'job_state': 'F',
  214. 'resources_used.foo_f': '0.29',
  215. 'resources_used.foo_i': '29',
  216. 'resources_used.foo_str4': "eight",
  217. 'resources_used.stra': "\"glad,elated\",\"happy\"",
  218. 'resources_used.vmem': '29gb',
  219. 'resources_used.cput': '00:00:50',
  220. 'resources_used.ncpus': '3'},
  221. extend='x', offset=10, attrop=PTL_AND, id=jid)
  222. foo_str_dict_in = {"eight": 8, "seven": 7, "nine": 9}
  223. qstat = self.server.status(
  224. JOB, 'resources_used.foo_str', id=jid, extend='x')
  225. foo_str_dict_out_str = eval(qstat[0]['resources_used.foo_str'])
  226. foo_str_dict_out = eval(foo_str_dict_out_str)
  227. self.assertTrue(foo_str_dict_in == foo_str_dict_out)
  228. # resources_used.foo_str3 must not be set since a sister value is not
  229. # of JSON-format.
  230. self.server.expect(JOB, 'resources_used.foo_str3',
  231. op=UNSET, extend='x', id=jid)
  232. self.momA.log_match(
  233. "Job %s resources_used.foo_str3 cannot be " % (jid,) +
  234. "accumulated: value '\"vn1\":4,\"vn2\":5,\"vn3\":6' " +
  235. "from mom %s not JSON-format" % (self.hostB,))
  236. # resources_used.foo_str2 must not be set.
  237. self.server.expect(JOB, 'resources_used.foo_str2', op=UNSET, id=jid)
  238. self.momA.log_match(
  239. "Job %s resources_used.foo_str2 cannot be " % (jid,) +
  240. "accumulated: value 'seven' from mom %s " % (self.hostA,) +
  241. "not JSON-format")
  242. # Match accounting_logs entry
  243. acctlog_match = 'resources_used.foo_f=0.29'
  244. self.server.accounting_match(
  245. "E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100)
  246. acctlog_match = 'resources_used.foo_i=29'
  247. self.server.accounting_match(
  248. "E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100)
  249. acctlog_match = "resources_used.foo_str='%s'" % (foo_str_dict_out_str,)
  250. self.server.accounting_match(
  251. "E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100)
  252. acctlog_match = 'resources_used.vmem=29gb'
  253. self.server.accounting_match(
  254. "E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100)
  255. acctlog_match = 'resources_used.cput=00:00:50'
  256. self.server.accounting_match(
  257. "E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100)
  258. # ensure resources_foo_str2 is not reported in accounting_logs since
  259. # it's unset due to non-JSON-format value.
  260. acctlog_match = 'resources_used.foo_str2='
  261. self.server.accounting_match("E;%s;.*%s.*" % (jid, acctlog_match),
  262. regexp=True, n=100, existence=False)
  263. acctlog_match = 'resources_used.foo_str4=eight'
  264. self.server.accounting_match(
  265. "E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100)
  266. acctlog_match = 'resources_used.ncpus=3'
  267. self.server.accounting_match(
  268. "E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100)
  269. # resources_used.foo_str3 must not show up in accounting_logs
  270. acctlog_match = 'resources_used.foo_str3=',
  271. self.server.accounting_match("E;%s;.*%s.*" % (jid, acctlog_match),
  272. regexp=True, n=100, existence=False)
  273. acctlog_match = 'resources_used.stra=\"glad\,elated\"\,\"happy\"'
  274. self.server.accounting_match(
  275. "E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100)
  276. def test_prologue(self):
  277. """
  278. Test accumulatinon of resources of a multinode job from an
  279. exechost_prologue hook.
  280. """
  281. self.logger.info("test_prologue")
  282. hook_body = """
  283. import pbs
  284. e=pbs.event()
  285. pbs.logmsg(pbs.LOG_DEBUG, "executed prologue hook")
  286. if e.job.in_ms_mom():
  287. e.job.resources_used["vmem"] = pbs.size("11gb")
  288. e.job.resources_used["foo_i"] = 11
  289. e.job.resources_used["foo_f"] = 0.11
  290. e.job.resources_used["foo_str"] = '{"seven":7}'
  291. e.job.resources_used["cput"] = 11
  292. e.job.resources_used["stra"] = '"glad,elated","happy"'
  293. e.job.resources_used["foo_str3"] = \
  294. \"\"\"{"a":6,"b":"some value #$%^&*@","c":54.4,"d":"32.5gb"}\"\"\"
  295. e.job.resources_used["foo_str2"] = "seven"
  296. e.job.resources_used["foo_str4"] = "eight"
  297. else:
  298. e.job.resources_used["vmem"] = pbs.size("12gb")
  299. e.job.resources_used["foo_i"] = 12
  300. e.job.resources_used["foo_f"] = 0.12
  301. e.job.resources_used["foo_str"] = '{"eight":8,"nine":9}'
  302. e.job.resources_used["foo_str2"] = '{"seven":7}'
  303. e.job.resources_used["cput"] = 12
  304. e.job.resources_used["stra"] = '"cucumbers,bananas"'
  305. e.job.resources_used["foo_str3"] = \"\"\""vn1":4,"vn2":5,"vn3":6\"\"\"
  306. """
  307. hook_name = "prolo"
  308. a = {'event': "execjob_prologue", 'enabled': 'True'}
  309. rv = self.server.create_import_hook(
  310. hook_name,
  311. a,
  312. hook_body,
  313. overwrite=True)
  314. self.assertTrue(rv)
  315. a = {'Resource_List.select': '3:ncpus=1',
  316. 'Resource_List.walltime': 10,
  317. 'Resource_List.place': 'scatter'}
  318. j = Job(TEST_USER)
  319. j.set_attributes(a)
  320. # The pbsdsh call is what allows a first task to get spawned on
  321. # on a sister mom, causing the execjob_prologue hook to execute.
  322. j.create_script(
  323. "pbsdsh -n 1 hostname\n" + "pbsdsh -n 2 hostname\n" + "sleep 10\n")
  324. jid = self.server.submit(j)
  325. # The results should show results for custom resources 'foo_i',
  326. # 'foo_f', 'foo_str', 'foo_str3', and bultin resources 'vmem',
  327. # 'cput', and should be accumulating based
  328. # on the hook script, where MS defines 1 value, while the 2 sister
  329. # Moms define the same value. For 'string' type, it will be a
  330. # union of all values obtained from sister moms and local mom, and
  331. # the result will be in JSON-format.
  332. #
  333. # foo_str is for testing normal values.
  334. # foo_str2 is for testing non-JSON format value received from MS.
  335. # foo_str3 is for testing non-JSON format value received from a sister
  336. # mom.
  337. # foo_str4 is for testing MS-only set values.
  338. #
  339. # For string_array type resource 'stra', it is not accumulated but
  340. # will be set to last seen value from a mom prologue hook.
  341. self.server.expect(JOB, {
  342. 'job_state': 'F',
  343. 'resources_used.foo_f': '0.35',
  344. 'resources_used.foo_i': '35',
  345. 'resources_used.foo_str4': "eight",
  346. 'resources_used.stra': "\"glad,elated\",\"happy\"",
  347. 'resources_used.vmem': '35gb',
  348. 'resources_used.cput': '00:00:35',
  349. 'resources_used.ncpus': '3'},
  350. extend='x', offset=10, attrop=PTL_AND, id=jid)
  351. foo_str_dict_in = {"eight": 8, "seven": 7, "nine": 9}
  352. qstat = self.server.status(
  353. JOB, 'resources_used.foo_str', id=jid, extend='x')
  354. foo_str_dict_out_str = eval(qstat[0]['resources_used.foo_str'])
  355. foo_str_dict_out = eval(foo_str_dict_out_str)
  356. self.assertTrue(foo_str_dict_in == foo_str_dict_out)
  357. # resources_used.foo_str3 must not be set since a sister value is
  358. # not of JSON-format.
  359. self.server.expect(JOB, 'resources_used.foo_str3',
  360. op=UNSET, extend='x', id=jid)
  361. self.momA.log_match(
  362. "Job %s resources_used.foo_str3 cannot be " % (jid,) +
  363. "accumulated: value '\"vn1\":4,\"vn2\":5,\"vn3\":6' " +
  364. "from mom %s not JSON-format" % (self.hostB,))
  365. self.momA.log_match(
  366. "Job %s resources_used.foo_str3 cannot be " % (jid,) +
  367. "accumulated: value '\"vn1\":4,\"vn2\":5,\"vn3\":6' " +
  368. "from mom %s not JSON-format" % (self.hostC,))
  369. # Ensure resources_used.foo_str3 is not set since it has a
  370. # non-JSON format value.
  371. self.server.expect(JOB, 'resources_used.foo_str3', op=UNSET,
  372. extend='x', id=jid)
  373. # resources_used.foo_str2 must not be set.
  374. self.server.expect(JOB, 'resources_used.foo_str2', op=UNSET, id=jid)
  375. self.momA.log_match(
  376. "Job %s resources_used.foo_str2 cannot be " % (jid,) +
  377. "accumulated: value 'seven' from " +
  378. "mom %s not JSON-format" % (self.hostA,))
  379. # Match accounting_logs entry
  380. acctlog_match = 'resources_used.foo_f=0.35'
  381. self.server.accounting_match(
  382. "E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100)
  383. acctlog_match = 'resources_used.foo_i=35'
  384. self.server.accounting_match(
  385. "E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100)
  386. acctlog_match = "resources_used.foo_str='%s'" % (foo_str_dict_out_str,)
  387. self.server.accounting_match(
  388. "E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100)
  389. acctlog_match = 'resources_used.vmem=35gb'
  390. self.server.accounting_match(
  391. "E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100)
  392. acctlog_match = 'resources_used.cput=00:00:35'
  393. self.server.accounting_match(
  394. "E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100)
  395. # resources_used.foo_str2 should not be reported in accounting_logs.
  396. acctlog_match = 'resources_used.foo_str2='
  397. self.server.accounting_match("E;%s;.*%s.*" % (jid, acctlog_match),
  398. regexp=True, n=100, existence=False)
  399. acctlog_match = 'resources_used.ncpus=3'
  400. self.server.accounting_match(
  401. "E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100)
  402. # resources_used.foo_str3 must not show up in accounting_logs
  403. acctlog_match = 'resources_used.foo_str3='
  404. self.server.accounting_match("E;%s;.*%s.*" % (jid, acctlog_match),
  405. regexp=True, n=100, existence=False)
  406. acctlog_match = 'resources_used.foo_str4=eight'
  407. self.server.accounting_match(
  408. "E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100)
  409. acctlog_match = 'resources_used.stra=\"glad\,elated\"\,\"happy\"'
  410. self.server.accounting_match(
  411. "E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100)
  412. def test_periodic(self):
  413. """
  414. Test accumulatinon of resources from an exechost_periodic hook.
  415. """
  416. self.logger.info("test_periodic")
  417. hook_body = """
  418. import pbs
  419. e=pbs.event()
  420. pbs.logmsg(pbs.LOG_DEBUG, "executed periodic hook")
  421. i = 0
  422. l = []
  423. for v in pbs.server().vnodes():
  424. pbs.logmsg(pbs.LOG_DEBUG, "node %s" % (v.name,))
  425. l.append(v.name)
  426. local_node=pbs.get_local_nodename()
  427. for jk in e.job_list.keys():
  428. if local_node == l[0]:
  429. e.job_list[jk].resources_used["vmem"] = pbs.size("11gb")
  430. e.job_list[jk].resources_used["foo_i"] = 11
  431. e.job_list[jk].resources_used["foo_f"] = 0.11
  432. e.job_list[jk].resources_used["foo_str"] = '{"seven":7}'
  433. e.job_list[jk].resources_used["cput"] = 11
  434. e.job_list[jk].resources_used["stra"] = '"glad,elated","happy"'
  435. e.job_list[jk].resources_used["foo_str3"] = \
  436. \"\"\"{"a":6,"b":"some value #$%^&*@","c":54.4,"d":"32.5gb"}\"\"\"
  437. e.job_list[jk].resources_used["foo_str2"] = "seven"
  438. elif local_node == l[1]:
  439. e.job_list[jk].resources_used["vmem"] = pbs.size("12gb")
  440. e.job_list[jk].resources_used["foo_i"] = 12
  441. e.job_list[jk].resources_used["foo_f"] = 0.12
  442. e.job_list[jk].resources_used["foo_str"] = '{"eight":8}'
  443. e.job_list[jk].resources_used["cput"] = 12
  444. e.job_list[jk].resources_used["stra"] = '"cucumbers,bananas"'
  445. e.job_list[jk].resources_used["foo_str2"] = '{"seven":7}'
  446. e.job_list[jk].resources_used["foo_str3"] = \
  447. \"\"\"{"vn1":4,"vn2":5,"vn3":6}\"\"\"
  448. else:
  449. e.job_list[jk].resources_used["vmem"] = pbs.size("13gb")
  450. e.job_list[jk].resources_used["foo_i"] = 13
  451. e.job_list[jk].resources_used["foo_f"] = 0.13
  452. e.job_list[jk].resources_used["foo_str"] = '{"nine":9}'
  453. e.job_list[jk].resources_used["foo_str2"] = '{"seven":7}'
  454. e.job_list[jk].resources_used["cput"] = 13
  455. e.job_list[jk].resources_used["stra"] = '"cucumbers,bananas"'
  456. e.job_list[jk].resources_used["foo_str3"] = \
  457. \"\"\"{"vn1":4,"vn2":5,"vn3":6}\"\"\"
  458. """
  459. hook_name = "period"
  460. a = {'event': "exechost_periodic", 'enabled': 'True', 'freq': 15}
  461. rv = self.server.create_import_hook(
  462. hook_name,
  463. a,
  464. hook_body,
  465. overwrite=True)
  466. self.assertTrue(rv)
  467. a = {'resources_available.ncpus': '2'}
  468. self.server.manager(MGR_CMD_SET, NODE, a, self.hostA,
  469. expect=True)
  470. self.server.manager(MGR_CMD_SET, NODE, a, self.hostB,
  471. expect=True)
  472. self.server.manager(MGR_CMD_SET, NODE, a, self.hostC,
  473. expect=True)
  474. a = {'Resource_List.select': '3:ncpus=1',
  475. 'Resource_List.place': 'scatter'}
  476. j = Job(TEST_USER)
  477. j.set_attributes(a)
  478. j.set_sleep_time("35")
  479. jid1 = self.server.submit(j)
  480. jid2 = self.server.submit(j)
  481. for jid in [jid1, jid2]:
  482. # The results should show results for custom resources 'foo_i',
  483. # 'foo_f', 'foo_str', 'foo_str3', and bultin resources 'vmem',
  484. # 'cput', and should be accumulating based
  485. # on the hook script, where MS defines 1 value, while the 2 sister
  486. # Moms define the same value. For 'string' type, it will be a
  487. # union of all values obtained from sister moms and local mom, and
  488. # the result will be in JSON-format.
  489. # foo_str is for testing normal values.
  490. # foo_str2 is for testing non-JSON format value received from MS.
  491. # foo_str3 is for testing non-JSON format value received from a
  492. # sister mom.
  493. #
  494. self.server.expect(JOB, {
  495. 'job_state': 'F',
  496. 'resources_used.foo_f': '0.36',
  497. 'resources_used.foo_i': '36',
  498. 'resources_used.stra': "\"glad,elated\",\"happy\"",
  499. 'resources_used.vmem': '36gb',
  500. 'resources_used.cput': '00:00:36',
  501. 'resources_used.ncpus': '3'},
  502. extend='x', offset=35, attrop=PTL_AND, id=jid)
  503. foo_str_dict_in = {"eight": 8, "seven": 7, "nine": 9}
  504. qstat = self.server.status(
  505. JOB, 'resources_used.foo_str', id=jid, extend='x')
  506. foo_str_dict_out_str = eval(qstat[0]['resources_used.foo_str'])
  507. foo_str_dict_out = eval(foo_str_dict_out_str)
  508. self.assertTrue(foo_str_dict_in == foo_str_dict_out)
  509. foo_str3_dict_in = {"a": 6, "b": "some value #$%^&*@",
  510. "c": 54.4, "d": "32.5gb", "vn1": 4,
  511. "vn2": 5, "vn3": 6}
  512. qstat = self.server.status(
  513. JOB, 'resources_used.foo_str3', id=jid, extend='x')
  514. foo_str3_dict_out_str = eval(qstat[0]['resources_used.foo_str3'])
  515. foo_str3_dict_out = eval(foo_str3_dict_out_str)
  516. self.assertTrue(foo_str3_dict_in == foo_str3_dict_out)
  517. # resources_used.foo_str2 must be unset since its value is not of
  518. # JSON-format.
  519. self.server.expect(JOB, 'resources_used.foo_str2', op=UNSET,
  520. extend='x', id=jid)
  521. # Match accounting_logs entry
  522. acctlog_match = 'resources_used.foo_f=0.36'
  523. self.server.accounting_match(
  524. "E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100)
  525. acctlog_match = 'resources_used.foo_i=36'
  526. self.server.accounting_match(
  527. "E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100)
  528. acctlog_match = "resources_used.foo_str='%s'" % (
  529. foo_str_dict_out_str,)
  530. self.server.accounting_match(
  531. "E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100)
  532. acctlog_match = 'resources_used.vmem=36gb'
  533. self.server.accounting_match(
  534. "E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100)
  535. acctlog_match = 'resources_used.cput=00:00:36'
  536. self.server.accounting_match(
  537. "E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100)
  538. # resources_used.foo_str2 must not show in accounting_logs
  539. acctlog_match = 'resources_used.foo_str2=',
  540. self.server.accounting_match("E;%s;.*%s.*" % (jid, acctlog_match),
  541. regexp=True, n=100, existence=False)
  542. acctlog_match = 'resources_used.ncpus=3'
  543. self.server.accounting_match(
  544. "E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100)
  545. acctlog_match = "resources_used.foo_str3='%s'" % (
  546. foo_str3_dict_out_str.replace('.', '\.').
  547. replace("#$%^&*@", "\#\$\%\^\&\*\@"))
  548. self.server.accounting_match(
  549. "E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100)
  550. acctlog_match = 'resources_used.stra=\"glad\,elated\"\,\"happy\"'
  551. self.server.accounting_match(
  552. "E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100)
  553. def test_resource_bool(self):
  554. """
  555. To test that boolean value are not getting aggregated
  556. """
  557. # Create a boolean type resource
  558. attr = {}
  559. attr['type'] = 'boolean'
  560. self.server.manager(
  561. MGR_CMD_CREATE, RSC, attr,
  562. id='foo_bool', runas=ROOT_USER,
  563. logerr=False)
  564. hook_body = """
  565. import pbs
  566. e=pbs.event()
  567. j=e.job
  568. if j.in_ms_mom():
  569. j.resources_used["foo_bool"] = True
  570. else:
  571. j.resources_used["foo_bool"] = False
  572. """
  573. hook_name = "epi_bool"
  574. a = {'event': "execjob_epilogue", 'enabled': "True"}
  575. self.server.create_import_hook(
  576. hook_name,
  577. a,
  578. hook_body,
  579. overwrite=True)
  580. a = {'Resource_List.select': '3:ncpus=1',
  581. 'Resource_List.walltime': 10,
  582. 'Resource_List.place': 'scatter'}
  583. j = Job(TEST_USER)
  584. j.set_attributes(a)
  585. j.set_sleep_time("5")
  586. jid = self.server.submit(j)
  587. # foo_bool is True
  588. a = {'resources_used.foo_bool': "True",
  589. 'job_state': 'F'}
  590. self.server.expect(JOB, a, extend='x', offset=5, attrop=PTL_AND,
  591. id=jid)
  592. def test_resource_invisible(self):
  593. """
  594. Test that value aggregation is same for invisible resources
  595. """
  596. # Set float and string_array to be invisible resource
  597. attr = {}
  598. attr['flag'] = 'ih'
  599. self.server.manager(
  600. MGR_CMD_SET, RSC, attr, id='foo_f', runas=ROOT_USER)
  601. self.server.manager(
  602. MGR_CMD_SET, RSC, attr, id='foo_str', runas=ROOT_USER)
  603. hook_body = """
  604. import pbs
  605. e=pbs.event()
  606. j = e.job
  607. if j.in_ms_mom():
  608. j.resources_used["foo_f"] = 2.114
  609. j.resources_used["foo_str"] = '{"one":1,"two":2}'
  610. else:
  611. j.resources_used["foo_f"] = 3.246
  612. j.resources_used["foo_str"] = '{"two":2, "three":3}'
  613. """
  614. hook_name = "epi_invis"
  615. a = {'event': "execjob_epilogue", 'enabled': 'True'}
  616. self.server.create_import_hook(
  617. hook_name,
  618. a,
  619. hook_body,
  620. overwrite=True)
  621. a = {'Resource_List.select': '3:ncpus=1',
  622. 'Resource_List.walltime': 10,
  623. 'Resource_List.place': 'scatter'}
  624. j = Job(TEST_USER)
  625. j.set_attributes(a)
  626. j.set_sleep_time("5")
  627. jid = self.server.submit(j)
  628. # Verify that values are accumulated for float and string array
  629. a = {'resources_used.foo_f': '8.606'}
  630. self.server.expect(JOB, a, extend='x', offset=5, id=jid)
  631. foo_str_dict_in = {"one": 1, "two": 2, "three": 3}
  632. qstat = self.server.status(
  633. JOB, 'resources_used.foo_str', id=jid, extend='x')
  634. foo_str_dict_out_str = eval(qstat[0]['resources_used.foo_str'])
  635. foo_str_dict_out = eval(foo_str_dict_out_str)
  636. self.assertEquals(foo_str_dict_in, foo_str_dict_out)
  637. def test_reservation(self):
  638. """
  639. Test that job inside reservations works same
  640. """
  641. # Create non-host level resources from qmgr
  642. attr = {}
  643. attr['type'] = 'size'
  644. self.server.manager(
  645. MGR_CMD_CREATE, RSC, attr, id='foo_i2', runas=ROOT_USER)
  646. # Ensure the new resource is seen by all moms.
  647. self.momA.log_match(
  648. "resourcedef;copy hook-related file", max_attempts=3)
  649. self.momB.log_match(
  650. "resourcedef;copy hook-related file", max_attempts=3)
  651. self.momC.log_match(
  652. "resourcedef;copy hook-related file", max_attempts=3)
  653. attr['type'] = 'float'
  654. self.server.manager(
  655. MGR_CMD_CREATE, RSC, attr, id='foo_f2', runas=ROOT_USER)
  656. # Ensure the new resource is seen by all moms.
  657. self.momA.log_match(
  658. "resourcedef;copy hook-related file", max_attempts=3)
  659. self.momB.log_match(
  660. "resourcedef;copy hook-related file", max_attempts=3)
  661. self.momC.log_match(
  662. "resourcedef;copy hook-related file", max_attempts=3)
  663. attr['type'] = 'string_array'
  664. self.server.manager(
  665. MGR_CMD_CREATE, RSC, attr, id='stra2', runas=ROOT_USER)
  666. # Ensure the new resource is seen by all moms.
  667. self.momA.log_match(
  668. "resourcedef;copy hook-related file", max_attempts=3)
  669. self.momB.log_match(
  670. "resourcedef;copy hook-related file", max_attempts=3)
  671. self.momC.log_match(
  672. "resourcedef;copy hook-related file", max_attempts=3)
  673. # Create an epilogue hook
  674. hook_body = """
  675. import pbs
  676. e = pbs.event()
  677. j = e.job
  678. pbs.logmsg(pbs.LOG_DEBUG, "executed epilogue hook")
  679. j.resources_used["foo_i"] = 2
  680. j.resources_used["foo_i2"] = pbs.size(1000)
  681. j.resources_used["foo_f"] = 1.02
  682. j.resources_used["foo_f2"] = 2.01
  683. j.resources_used["stra"] = '"happy"'
  684. j.resources_used["stra2"] = '"glad"'
  685. """
  686. # Create and import hook
  687. a = {'event': "execjob_epilogue", 'enabled': 'True'}
  688. self.server.create_import_hook(
  689. "epi", a, hook_body,
  690. overwrite=True)
  691. # Submit a reservation
  692. a = {'Resource_List.select': '3:ncpus=1',
  693. 'Resource_List.place': 'scatter',
  694. 'reserve_start': time.time() + 10,
  695. 'reserve_end': time.time() + 30, }
  696. r = Reservation(TEST_USER, a)
  697. rid = self.server.submit(r)
  698. a = {'reserve_state': (MATCH_RE, "RESV_CONFIRMED|2")}
  699. self.server.expect(RESV, a, id=rid)
  700. rname = rid.split('.')
  701. # Submit a job inside reservation
  702. a = {'Resource_List.select': '3:ncpus=1', ATTR_queue: rname[0]}
  703. j = Job(TEST_USER)
  704. j.set_attributes(a)
  705. j.set_sleep_time(20)
  706. jid = self.server.submit(j)
  707. # Verify the resource values
  708. a = {'resources_used.foo_i': '6',
  709. 'resources_used.foo_i2': '3kb',
  710. 'resources_used.foo_f': '3.06',
  711. 'resources_used.foo_f2': '6.03',
  712. 'resources_used.stra': "\"happy\"",
  713. 'resources_used.stra2': "\"glad\"",
  714. 'job_state': 'F'}
  715. self.server.expect(JOB, a, extend='x', attrop=PTL_AND,
  716. offset=30, interval=1,
  717. max_attempts=20, id=jid)
  718. # Restart server and verifies that the values are still the same
  719. self.server.restart()
  720. # Below is commented due to a known PBS issue
  721. # self.server.expect(JOB, a, extend='x', id=jid)
  722. def test_server_restart(self):
  723. """
  724. Test that resource accumulation will not get
  725. impacted if server is restarted during job execution
  726. """
  727. # Create a prologue hook
  728. hook_body = """
  729. import pbs
  730. e=pbs.event()
  731. pbs.logmsg(pbs.LOG_DEBUG, "executed prologue hook")
  732. if e.job.in_ms_mom():
  733. e.job.resources_used["vmem"] = pbs.size("11gb")
  734. e.job.resources_used["foo_i"] = 11
  735. e.job.resources_used["foo_f"] = 0.11
  736. e.job.resources_used["foo_str"] = '{"seven":7}'
  737. e.job.resources_used["cput"] = 11
  738. e.job.resources_used["stra"] = '"glad,elated","happy"'
  739. e.job.resources_used["foo_str4"] = "eight"
  740. else:
  741. e.job.resources_used["vmem"] = pbs.size("12gb")
  742. e.job.resources_used["foo_i"] = 12
  743. e.job.resources_used["foo_f"] = 0.12
  744. e.job.resources_used["foo_str"] = '{"eight":8,"nine":9}'
  745. e.job.resources_used["cput"] = 12
  746. e.job.resources_used["stra"] = '"cucumbers,bananas"'
  747. """
  748. hook_name = "prolo"
  749. a = {'event': "execjob_prologue", 'enabled': 'True'}
  750. self.server.create_import_hook(
  751. hook_name,
  752. a,
  753. hook_body,
  754. overwrite=True)
  755. a = {'Resource_List.select': '3:ncpus=1',
  756. 'Resource_List.walltime': 20,
  757. 'Resource_List.place': 'scatter'}
  758. j = Job(TEST_USER)
  759. j.set_attributes(a)
  760. # The pbsdsh call is what allows a first task to get spawned on
  761. # on a sister mom, causing the execjob_prologue hook to execute.
  762. j.create_script(
  763. "pbsdsh -n 1 hostname\n" +
  764. "pbsdsh -n 2 hostname\n" +
  765. "sleep 10\n")
  766. jid = self.server.submit(j)
  767. # Once the job is started running restart server
  768. self.server.expect(JOB, {'job_state': "R", "substate": 42}, id=jid)
  769. self.server.restart()
  770. # Job will be requeued and rerun. Verify that the
  771. # resource accumulation is similar as if server is
  772. # not started
  773. a = {'resources_used.foo_i': '35',
  774. 'resources_used.foo_f': '0.35',
  775. 'resources_used.vmem': '35gb',
  776. 'resources_used.cput': '00:00:35',
  777. 'resources_used.stra': "\"glad,elated\",\"happy\"",
  778. 'resources_used.foo_str4': "eight",
  779. 'job_state': 'F'}
  780. self.server.expect(JOB, a, extend='x',
  781. offset=5, id=jid, interval=1, attrop=PTL_AND)
  782. foo_str_dict_in = {"eight": 8, "seven": 7, "nine": 9}
  783. qstat = self.server.status(
  784. JOB, 'resources_used.foo_str', id=jid, extend='x')
  785. foo_str_dict_out_str = eval(qstat[0]['resources_used.foo_str'])
  786. foo_str_dict_out = eval(foo_str_dict_out_str)
  787. self.assertEquals(foo_str_dict_in, foo_str_dict_out)
  788. def test_mom_down(self):
  789. """
  790. Test that resource_accumulation is not impacted due to
  791. mom restart
  792. """
  793. # Set node_fail_requeue to requeue job
  794. self.server.manager(MGR_CMD_SET, SERVER,
  795. {'node_fail_requeue': 10})
  796. hook_body = """
  797. import pbs
  798. e = pbs.event()
  799. pbs.logmsg(pbs.LOG_DEBUG, "executed periodic hook")
  800. for jj in e.job_list.keys():
  801. e.job_list[jj].resources_used["foo_i"] = 1
  802. e.job_list[jj].resources_used["foo_str"] = '{"happy":"true"}'
  803. e.job_list[jj].resources_used["stra"] = '"one","two"'
  804. """
  805. a = {'event': "exechost_periodic", 'enabled': 'True', 'freq': 10}
  806. self.server.create_import_hook(
  807. "period",
  808. a,
  809. hook_body,
  810. overwrite=True)
  811. a = {'Resource_List.select': '3:ncpus=1',
  812. 'Resource_List.place': 'scatter'}
  813. j = Job(TEST_USER)
  814. j.set_attributes(a)
  815. jid1 = self.server.submit(j)
  816. # Submit a job that can never run
  817. a = {'Resource_List.select': '5:ncpus=1',
  818. 'Resource_List.place': 'scatter'}
  819. j.set_attributes(a)
  820. jid2 = self.server.submit(j)
  821. # Wait for 10s approx for hook to get executed
  822. # verify the resources_used.foo_i
  823. self.server.expect(JOB, {'resources_used.foo_i': '3'},
  824. offset=10, id=jid1, interval=1)
  825. self.server.expect(JOB, "resources_used.foo_i", op=UNSET, id=jid2)
  826. # Bring sister mom down
  827. self.momB.stop()
  828. # Wait for 20 more seconds for preiodic hook to run
  829. # more than once and verify that value is still 3
  830. self.server.expect(JOB, {'resources_used.foo_i': '3'},
  831. offset=20, id=jid1, interval=1)
  832. # Wait for job to be requeued by node_fail_requeue
  833. self.server.rerunjob(jid1, runas=ROOT_USER)
  834. self.server.expect(JOB, {'job_state': 'Q'}, id=jid1)
  835. # Verify that resources_used.foo_i is unset
  836. self.server.expect(JOB, "resources_used.foo_i", op=UNSET, id=jid1)
  837. # Bring sister mom up
  838. self.momB.start()
  839. self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'True'})
  840. self.server.expect(JOB, {'job_state': 'R'}, id=jid1, interval=1)
  841. # Verify that value of foo_i for job1 is set back
  842. self.server.expect(JOB, {'resources_used.foo_i': '3'},
  843. offset=10, id=jid1, interval=1)
  844. def test_job_rerun(self):
  845. """
  846. Test that resource accumulates once when job
  847. is rerun
  848. """
  849. hook_body = """
  850. import pbs
  851. e = pbs.event()
  852. pbs.logmsg(pbs.LOG_DEBUG, "executed periodic hook")
  853. for jj in e.job_list.keys():
  854. e.job_list[jj].resources_used["foo_f"] = 1.01
  855. e.job_list[jj].resources_used["cput"] = 10
  856. """
  857. a = {'event': "exechost_periodic", 'enabled': 'True', 'freq': 10}
  858. self.server.create_import_hook(
  859. "period",
  860. a,
  861. hook_body,
  862. overwrite=True)
  863. a = {'Resource_List.select': '3:ncpus=1',
  864. 'Resource_List.place': 'scatter'}
  865. j = Job(TEST_USER)
  866. j.set_attributes(a)
  867. jid1 = self.server.submit(j)
  868. self.server.expect(JOB, {'job_state': "R", "substate": 42}, id=jid1)
  869. # Wait for 10s approx for hook to get executed
  870. # Verify the resources_used.foo_f
  871. a = {'resources_used.foo_f': '3.03',
  872. 'resources_used.cput': 30}
  873. self.server.expect(JOB, a,
  874. offset=10, id=jid1, attrop=PTL_AND, interval=1)
  875. # Rerun the job
  876. self.server.manager(MGR_CMD_SET, SERVER,
  877. {'scheduling': 'False'})
  878. self.server.rerunjob(jobid=jid1, runas=ROOT_USER)
  879. self.server.expect(JOB,
  880. {'job_state': 'Q'}, id=jid1)
  881. # Verify that foo_f is unset
  882. self.server.expect(JOB,
  883. 'Resource_List.foo_f',
  884. op=UNSET, id=jid1)
  885. # turn the scheduling on
  886. self.server.manager(MGR_CMD_SET, SERVER,
  887. {'scheduling': 'True'})
  888. self.server.expect(JOB, {'job_state': "R", "substate": 42},
  889. attrop=PTL_AND, id=jid1)
  890. # Validate that resources_used.foo_f is reset
  891. self.server.expect(JOB, a,
  892. offset=10, id=jid1, attrop=PTL_AND, interval=1)
  893. def test_job_array(self):
  894. """
  895. Test that resource accumulation for subjobs also work
  896. """
  897. hook_body = """
  898. import pbs
  899. e=pbs.event()
  900. pbs.logmsg(pbs.LOG_DEBUG, "executed epilogue hook")
  901. if e.job.in_ms_mom():
  902. e.job.resources_used["vmem"] = pbs.size("9gb")
  903. e.job.resources_used["foo_i"] = 9
  904. e.job.resources_used["foo_f"] = 0.09
  905. e.job.resources_used["foo_str"] = '{"seven":7}'
  906. e.job.resources_used["cput"] = 10
  907. e.job.resources_used["stra"] = '"glad,elated","happy"'
  908. else:
  909. e.job.resources_used["vmem"] = pbs.size("10gb")
  910. e.job.resources_used["foo_i"] = 10
  911. e.job.resources_used["foo_f"] = 0.10
  912. e.job.resources_used["foo_str"] = '{"eight":8,"nine":9}'
  913. e.job.resources_used["cput"] = 20
  914. e.job.resources_used["stra"] = '"cucumbers,bananas"'
  915. """
  916. a = {'event': "execjob_epilogue", 'enabled': 'True'}
  917. self.server.create_import_hook(
  918. "test",
  919. a,
  920. hook_body,
  921. overwrite=True)
  922. a = {'Resource_List.select': '3:ncpus=1',
  923. 'Resource_List.walltime': 10,
  924. 'Resource_List.place': 'scatter'}
  925. j = Job(TEST_USER, attrs={ATTR_J: '1-2'})
  926. j.set_attributes(a)
  927. j.set_sleep_time("5")
  928. jid = self.server.submit(j)
  929. # Verify that once subjobs are over values are
  930. # set for each subjob in the accounting logs
  931. subjob1 = string.replace(jid, '[]', '[1]')
  932. acctlog_match = 'resources_used.foo_f=0.29'
  933. # Below code is commented due to a PTL issue
  934. # s = self.server.accounting_match(
  935. # "E;%s;.*%s.*" % (subjob1, acctlog_match), regexp=True, n=100)
  936. # self.assertTrue(s)
  937. acctlog_match = 'resources_used.foo_i=29'
  938. # s = self.server.accounting_match(
  939. # "E;%s;.*%s.*" % (subjob1, acctlog_match), regexp=True, n=100)
  940. # self.assertTrue(s)
  941. foo_str_dict_in = {"eight": 8, "seven": 7, "nine": 9}
  942. acctlog_match = "resources_used.foo_str='%s'" % (foo_str_dict_in,)
  943. # s = self.server.accounting_match(
  944. # "E;%s;.*%s.*" % (subjob1, acctlog_match), regexp=True, n=100)
  945. # self.assertTrue(s)
  946. acctlog_match = 'resources_used.vmem=29gb'
  947. # s = self.server.accounting_match(
  948. # "E;%s;.*%s.*" % (subjob1, acctlog_match), regexp=True, n=100)
  949. # self.assertTrue(s)
  950. acctlog_match = 'resources_used.cput=00:00:50'
  951. # s = self.server.accounting_match(
  952. # "E;%s;.*%s.*" % (subjob1, acctlog_match), regexp=True, n=100)
  953. # self.assertTrue(s)
  954. acctlog_match = 'resources_used.stra=\"glad\,elated\"\,\"happy\"'
  955. # s = self.server.accounting_match(
  956. # "E;%s;.*%s.*" % (subjob1, acctlog_match), regexp=True, n=100)
  957. # self.assertTrue(s)
  958. def test_epi_pro(self):
  959. """
  960. Test that epilogue and prologue changing same
  961. and different resources. Values of same resource
  962. would get overwriteen by the last hook.
  963. """
  964. hook_body = """
  965. import pbs
  966. e=pbs.event()
  967. pbs.logmsg(pbs.LOG_DEBUG, "In prologue hook")
  968. e.job.resources_used["foo_i"] = 10
  969. e.job.resources_used["foo_f"] = 0.10
  970. """
  971. a = {'event': "execjob_prologue", 'enabled': 'True'}
  972. self.server.create_import_hook(
  973. "pro", a, hook_body,
  974. overwrite=True)
  975. # Verify the copy message in the logs to avoid
  976. # race conditions
  977. self.momA.log_match(
  978. "pro.PY;copy hook-related file", max_attempts=10)
  979. self.momB.log_match(
  980. "pro.PY;copy hook-related file", max_attempts=10)
  981. self.momC.log_match(
  982. "pro.PY;copy hook-related file", max_attempts=10)
  983. hook_body = """
  984. import pbs
  985. e=pbs.event()
  986. pbs.logmsg(pbs.LOG_DEBUG, "In epilogue hook")
  987. e.job.resources_used["foo_f"] = 0.20
  988. e.job.resources_used["cput"] = 10
  989. """
  990. a = {'event': "execjob_epilogue", 'enabled': 'True'}
  991. self.server.create_import_hook(
  992. "epi", a, hook_body,
  993. overwrite=True)
  994. # Verify the copy message in the logs to avoid
  995. # race conditions
  996. self.momA.log_match(
  997. "epi.PY;copy hook-related file", max_attempts=10)
  998. self.momB.log_match(
  999. "epi.PY;copy hook-related file", max_attempts=10)
  1000. self.momC.log_match(
  1001. "epi.PY;copy hook-related file", max_attempts=10)
  1002. a = {'Resource_List.select': '3:ncpus=1',
  1003. 'Resource_List.place': 'scatter'}
  1004. j = Job(TEST_USER)
  1005. j.set_attributes(a)
  1006. j.create_script(
  1007. "pbsdsh -n 1 hostname\n" +
  1008. "pbsdsh -n 2 hostname\n" +
  1009. "sleep 5\n")
  1010. jid = self.server.submit(j)
  1011. # Verify the resources_used once the job is over
  1012. self.server.expect(JOB, {
  1013. 'resources_used.foo_i': '30',
  1014. 'resources_used.foo_f': '0.6',
  1015. 'resources_used.cput': '30',
  1016. 'job_state': 'F'}, attrop=PTL_AND,
  1017. extend='x', id=jid, offset=5,
  1018. max_attempts=60, interval=1)
  1019. # Submit another job
  1020. j1 = Job(TEST_USER)
  1021. j1.set_attributes(a)
  1022. j1.create_script(
  1023. "pbsdsh -n 1 hostname\n" +
  1024. "pbsdsh -n 2 hostname\n" +
  1025. "sleep 300\n")
  1026. jid1 = self.server.submit(j1)
  1027. # Verify that prologue hook has set the values
  1028. self.server.expect(JOB, {
  1029. 'job_state': 'R',
  1030. 'resources_used.foo_i': '30',
  1031. 'resources_used.foo_f': '0.3'}, attrop=PTL_AND,
  1032. id=jid1, max_attempts=30, interval=2)
  1033. # Force delete the job
  1034. self.server.deljob(id=jid1, wait=True, attr_W="force")
  1035. # Verify values are accumulated by prologue hook only
  1036. self.server.expect(JOB, {
  1037. 'resources_used.foo_i': '30',
  1038. 'resources_used.foo_f': '0.3'}, attrop=PTL_AND,
  1039. extend='x', id=jid1)
  1040. def test_server_restart2(self):
  1041. """
  1042. Test that server restart during hook execution
  1043. has no impact
  1044. """
  1045. hook_body = """
  1046. import pbs
  1047. import time
  1048. e = pbs.event()
  1049. pbs.logmsg(pbs.LOG_DEBUG, "executed epilogue hook")
  1050. if e.job.in_ms_mom():
  1051. e.job.resources_used["vmem"] = pbs.size("9gb")
  1052. e.job.resources_used["foo_i"] = 9
  1053. e.job.resources_used["foo_f"] = 0.09
  1054. e.job.resources_used["foo_str"] = '{"seven":7}'
  1055. e.job.resources_used["cput"] = 10
  1056. else:
  1057. e.job.resources_used["vmem"] = pbs.size("10gb")
  1058. e.job.resources_used["foo_i"] = 10
  1059. e.job.resources_used["foo_f"] = 0.10
  1060. e.job.resources_used["foo_str"] = '{"eight":8,"nine":9}'
  1061. e.job.resources_used["cput"] = 20
  1062. time.sleep(15)
  1063. """
  1064. a = {'event': "execjob_epilogue", 'enabled': 'True'}
  1065. self.server.create_import_hook(
  1066. "epi", a, hook_body, overwrite=True)
  1067. # Submit a job
  1068. a = {'Resource_List.select': '3:ncpus=1',
  1069. 'Resource_List.walltime': 10,
  1070. 'Resource_List.place': "scatter"}
  1071. j = Job(TEST_USER)
  1072. j.set_attributes(a)
  1073. j.set_sleep_time("5")
  1074. jid = self.server.submit(j)
  1075. # Verify the resource values
  1076. a = {'resources_used.foo_i': 29,
  1077. 'resources_used.foo_f': 0.29,
  1078. 'resources_used.foo_str':
  1079. "\'{\"eight\": 8, \"seven\": 7, \"nine\": 9}\'"}
  1080. self.server.expect(JOB, a, extend='x', attrop=PTL_AND,
  1081. offset=5, id=jid, interval=1)
  1082. # Restart server while hook is still executing
  1083. self.server.restart()
  1084. # Verify that values again
  1085. self.server.expect(JOB, a, extend='x', attrop=PTL_AND,
  1086. id=jid)
  1087. def test_mom_down2(self):
  1088. """
  1089. Test that when mom is down values are still
  1090. accumulated for resources
  1091. """
  1092. hook_body = """
  1093. import pbs
  1094. e=pbs.event()
  1095. pbs.logmsg(pbs.LOG_DEBUG, "executed epilogue hook")
  1096. if e.job.in_ms_mom():
  1097. e.job.resources_used["vmem"] = pbs.size("9gb")
  1098. e.job.resources_used["foo_i"] = 9
  1099. e.job.resources_used["foo_f"] = 0.09
  1100. e.job.resources_used["foo_str"] = '{"seven":7}'
  1101. e.job.resources_used["cput"] = 10
  1102. e.job.resources_used["stra"] = '"glad,elated","happy"'
  1103. else:
  1104. e.job.resources_used["vmem"] = pbs.size("10gb")
  1105. e.job.resources_used["foo_i"] = 10
  1106. e.job.resources_used["foo_f"] = 0.10
  1107. e.job.resources_used["foo_str"] = '{"eight":8,"nine":9}'
  1108. e.job.resources_used["cput"] = 20
  1109. e.job.resources_used["stra"] = '"cucumbers,bananas"'
  1110. """
  1111. a = {'event': "execjob_epilogue",
  1112. 'enabled': 'True'}
  1113. self.server.create_import_hook(
  1114. "epi", a, hook_body,
  1115. overwrite=True)
  1116. # Submit a job
  1117. a = {'Resource_List.select': '3:ncpus=1',
  1118. 'Resource_List.walltime': 10,
  1119. 'Resource_List.place': "scatter"}
  1120. j = Job(TEST_USER)
  1121. j.set_attributes(a)
  1122. j.set_sleep_time("10")
  1123. jid = self.server.submit(j)
  1124. # Verify job is running
  1125. self.server.expect(JOB,
  1126. {'job_state': "R"}, id=jid)
  1127. # Bring sister mom down
  1128. self.momB.stop()
  1129. # Wait for job to end
  1130. # Validate that the values are being set
  1131. # with 2 moms only
  1132. self.server.expect(JOB,
  1133. {'job_state': 'F',
  1134. 'resources_used.foo_i': '19',
  1135. 'resources_used.foo_f': '0.19',
  1136. 'resources_used.foo_str':
  1137. '\'{\"eight\": 8, \"seven\": 7, \"nine\": 9}\''},
  1138. offset=10, id=jid, interval=1, extend='x',
  1139. attrop=PTL_AND)
  1140. # Bring the mom back up
  1141. self.momB.start()