pbs_hook_execjob_prologue.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. # coding: utf-8
  2. # Copyright (C) 1994-2018 Altair Engineering, Inc.
  3. # For more information, contact Altair at www.altair.com.
  4. #
  5. # This file is part of the PBS Professional ("PBS Pro") software.
  6. #
  7. # Open Source License Information:
  8. #
  9. # PBS Pro is free software. You can redistribute it and/or modify it under the
  10. # terms of the GNU Affero General Public License as published by the Free
  11. # Software Foundation, either version 3 of the License, or (at your option) any
  12. # later version.
  13. #
  14. # PBS Pro is distributed in the hope that it will be useful, but WITHOUT ANY
  15. # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
  16. # FOR A PARTICULAR PURPOSE.
  17. # See the GNU Affero General Public License for more details.
  18. #
  19. # You should have received a copy of the GNU Affero General Public License
  20. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  21. #
  22. # Commercial License Information:
  23. #
  24. # For a copy of the commercial license terms and conditions,
  25. # go to: (http://www.pbspro.com/UserArea/agreement.html)
  26. # or contact the Altair Legal Department.
  27. #
  28. # Altair’s dual-license business model allows companies, individuals, and
  29. # organizations to create proprietary derivative works of PBS Pro and
  30. # distribute them - whether embedded or bundled with other software -
  31. # under a commercial license agreement.
  32. #
  33. # Use of Altair’s trademarks, including but not limited to "PBS™",
  34. # "PBS Professional®", and "PBS Pro™" and Altair’s logos is subject to Altair's
  35. # trademark licensing policies.
  36. from tests.functional import *
  37. class TestPbsExecutePrologue(TestFunctional):
  38. """
  39. This tests the feature in PBS that allows execjob_prologue hook to
  40. execute on all sister moms all the time, and not just when first
  41. task is spawned on the node.
  42. PRE: Have a cluster of PBS with 3 mom hosts.
  43. """
  44. def setUp(self):
  45. if len(self.moms) != 3:
  46. self.skip_test(reason="need 3 mom hosts: -p moms=<m1>:<m2>:<m3>")
  47. TestFunctional.setUp(self)
  48. self.momA = self.moms.values()[0]
  49. self.momB = self.moms.values()[1]
  50. self.momC = self.moms.values()[2]
  51. self.hostA = self.momA.shortname
  52. self.hostB = self.momB.shortname
  53. self.hostC = self.momC.shortname
  54. self.server.expect(VNODE, {'state=free': 3}, op=GE, max_attempts=10,
  55. interval=2)
  56. def test_prologue_execute_on_all_moms(self):
  57. """
  58. Test to make sure execjob_prologue always get
  59. executed on all sister moms when mother superior
  60. has successfully executed its prologue hook.
  61. """
  62. hook_name = "prologue_logmsg"
  63. hook_body = ("import pbs\n"
  64. "e = pbs.event()\n"
  65. "pbs.logjobmsg(e.job.id, 'executed prologue hook')\n")
  66. attr = {'event': 'execjob_prologue', 'enabled': 'True'}
  67. self.server.create_import_hook(hook_name, attr, hook_body)
  68. attr = {'resources_available.ncpus': 1,
  69. 'resources_available.mem': '2gb'}
  70. self.server.manager(MGR_CMD_SET, NODE, attr, id=self.hostA)
  71. self.server.manager(MGR_CMD_SET, NODE, attr, id=self.hostB)
  72. self.server.manager(MGR_CMD_SET, NODE, attr, id=self.hostC)
  73. attr = {'Resource_List.select': '3:ncpus=1',
  74. 'Resource_List.place': 'scatter',
  75. 'Resource_List.walltime': 30}
  76. j = Job(TEST_USER, attrs=attr)
  77. jid = self.server.submit(j)
  78. self.momB.log_match("Job;%s;JOIN_JOB as node" % jid, n=100,
  79. max_attempts=10, interval=2)
  80. self.momC.log_match("Job;%s;JOIN_JOB as node" % jid, n=100,
  81. max_attempts=10, interval=2)
  82. self.momA.log_match("Job;%s;executed prologue hook" % jid,
  83. n=100, max_attempts=10, interval=2)
  84. self.momB.log_match("Job;%s;executed prologue hook" % jid,
  85. n=100, max_attempts=10, interval=2)
  86. self.momC.log_match("Job;%s;executed prologue hook" % jid,
  87. n=100, max_attempts=10, interval=2)
  88. def test_prologue_internal_error_no_fail_action(self):
  89. """
  90. Test a prologue hook with an internal error and no fail_action.
  91. """
  92. hook_name = "prologue_exception"
  93. hook_body = ("import pbs\n"
  94. "e = pbs.event()\n"
  95. "x\n")
  96. attr = {'event': 'execjob_prologue',
  97. 'enabled': 'True'}
  98. self.server.create_import_hook(hook_name, attr, hook_body)
  99. attr = {'Resource_List.select': 'vnode=%s' % self.hostA,
  100. 'Resource_List.walltime': 30}
  101. j = Job(TEST_USER, attrs=attr)
  102. j.set_sleep_time(1)
  103. self.server.submit(j)
  104. self.server.expect(NODE, {'state': 'free'}, id=self.hostA, offset=1)
  105. def test_prologue_internal_error_offline_vnodes(self):
  106. """
  107. Test a prologue hook with an internal error and
  108. fail_action=offline_vnodes.
  109. """
  110. attr = {'resources_available.mem': '2gb',
  111. 'resources_available.ncpus': '1'}
  112. self.server.create_vnodes(self.hostC, attr, 3, self.momC, delall=True,
  113. usenatvnode=True)
  114. hook_name = "prologue_exception"
  115. hook_body = ("import pbs\n"
  116. "e = pbs.event()\n"
  117. "x\n")
  118. attr = {'event': 'execjob_prologue',
  119. 'enabled': 'True',
  120. 'fail_action': 'offline_vnodes'}
  121. self.server.create_import_hook(hook_name, attr, hook_body)
  122. attr = {'Resource_List.select': 'vnode=%s[0]' % self.hostC,
  123. 'Resource_List.walltime': 30}
  124. j = Job(TEST_USER, attrs=attr)
  125. self.server.submit(j)
  126. attr = {'state': 'offline',
  127. 'comment': "offlined by hook '%s' due to hook error"
  128. % hook_name}
  129. self.server.expect(VNODE, attr, id=self.hostC, max_attempts=10,
  130. interval=2)
  131. self.server.expect(VNODE, attr, id='%s[0]' % self.hostC,
  132. max_attempts=10, interval=2)
  133. self.server.expect(VNODE, attr, id='%s[1]' % self.hostC,
  134. max_attempts=10, interval=2)
  135. # revert momC
  136. self.server.manager(MGR_CMD_SET, NODE, {'state': (DECR, 'offline')},
  137. id=self.hostC)
  138. self.server.manager(MGR_CMD_SET, NODE, {'state': (DECR, 'offline')},
  139. id='%s[0]' % self.hostC)
  140. self.server.manager(MGR_CMD_SET, NODE, {'state': (DECR, 'offline')},
  141. id='%s[1]' % self.hostC)
  142. self.server.manager(MGR_CMD_UNSET, NODE, 'comment',
  143. id=self.hostC)
  144. self.server.manager(MGR_CMD_UNSET, NODE, 'comment',
  145. id='%s[0]' % self.hostC)
  146. self.server.manager(MGR_CMD_UNSET, NODE, 'comment',
  147. id='%s[1]' % self.hostC)
  148. self.momC.revert_to_defaults()
  149. def test_prologue_hook_set_fail_action(self):
  150. """
  151. Test that fail_actions can be set on execjob_prologue
  152. hooks by qmgr.
  153. """
  154. hook_name = "prologue"
  155. hook_body = ("import pbs\n"
  156. "pbs.event().accept()\n")
  157. attr = {'event': 'execjob_prologue',
  158. 'enabled': 'True'}
  159. self.server.create_import_hook(hook_name, attr, hook_body)
  160. self.server.expect(HOOK, {'fail_action': 'none'})
  161. self.server.manager(MGR_CMD_SET, HOOK,
  162. {'fail_action': 'offline_vnodes'},
  163. id=hook_name)
  164. self.server.expect(HOOK, {'fail_action': 'offline_vnodes'})
  165. self.server.manager(MGR_CMD_SET, HOOK,
  166. {'fail_action': 'scheduler_restart_cycle'},
  167. id=hook_name)
  168. self.server.expect(HOOK, {'fail_action': 'scheduler_restart_cycle'},
  169. id=hook_name)
  170. def test_prologue_hook_set_job_attr(self):
  171. """
  172. Test that a execjob_prologue hook can modify job attributes.
  173. """
  174. hook_name = "prologue_set_job_attr"
  175. hook_body = ("import pbs\n"
  176. "pbs.event().job.resources_used['file']="
  177. "pbs.size('2gb')\n")
  178. attr = {'event': 'execjob_prologue',
  179. 'enabled': 'True'}
  180. self.server.create_import_hook(hook_name, attr, hook_body)
  181. self.server.manager(MGR_CMD_SET, SERVER,
  182. {'job_history_enable': 'True'})
  183. j = Job(TEST_USER)
  184. j.set_sleep_time(1)
  185. jid = self.server.submit(j)
  186. attr = {'resources_used.file': '2gb'}
  187. self.server.expect(JOB, attr, id=jid, extend='x', offset=1)
  188. self.server.accounting_match(
  189. "E;" + jid + ";.*resources_used.file=2gb", regexp=True,
  190. max_attempts=10)
  191. def test_prologue_hook_fail_action_non_mom_hook(self):
  192. """
  193. Test that when fail action is set to anything other than 'None' on
  194. a mom hook, and the mom hook event is not either execjob_begin,
  195. exechost_startup, execjob_prologue, an error message is dispalyed
  196. """
  197. hook_name = "prologue"
  198. hook_body = ("import pbs\n"
  199. "pbs.event().accept()\n")
  200. attr = {'event': 'exechost_periodic',
  201. 'fail_action': 'offline_vnodes'}
  202. try:
  203. self.server.create_import_hook(hook_name, attr, hook_body)
  204. except PbsManagerError, e:
  205. exp_err = "Can't set hook fail_action value to 'offline_vnodes':"
  206. exp_err += " hook event must"
  207. exp_err += " contain at least one of execjob_begin"
  208. exp_err += ", exechost_startup, execjob_prologue"
  209. self.assertTrue(exp_err in e.msg[0])
  210. def test_prologue_hook_does_not_execute_twice_on_pbsdsh(self):
  211. """
  212. This test creates a hook and then submits a job.
  213. It then uses the job output file to do a log_match
  214. on both the moms
  215. """
  216. hook_name = 'prologue'
  217. hook_body = ("import pbs\n"
  218. "e = pbs.event()\n"
  219. "pbs.logjobmsg(e.job.id, 'executed prologue hook')\n")
  220. attr = {'event': 'execjob_prologue'}
  221. self.server.create_import_hook(hook_name, attr, hook_body)
  222. j = Job(TEST_USER, {'Resource_List.select': '2:ncpus=1',
  223. 'Resource_List.place': 'scatter'})
  224. j.create_script('#!/bin/sh\npbsdsh hostname\nsleep 10\n')
  225. jid = self.server.submit(j)
  226. attribs = self.server.status(JOB, id=jid)
  227. self.server.expect(JOB, 'queue', op=UNSET, id=jid, offset=10)
  228. host, opath = attribs[0]['Output_Path'].split(':', 1)
  229. ret = self.du.cat(hostname=host, filename=opath, runas=TEST_USER)
  230. _msg = "cat command failed with error: %s" % ret['err']
  231. self.assertEqual(ret['rc'], 0, _msg)
  232. mom1 = ret['out'][2].split(".")[0]
  233. mom2 = ret['out'][3].split(".")[0]
  234. self.exec_mom1 = self.moms[mom1]
  235. self.exec_mom2 = self.moms[mom2]
  236. self.exec_mom1.log_match("Job;%s;executed prologue hook" % jid)
  237. self.exec_mom2.log_match("Job;%s;executed prologue hook" % jid)