pbs_power_provisioning_sgi.py 15 KB


  1. # coding: utf-8
  2. # Copyright (C) 1994-2018 Altair Engineering, Inc.
  3. # For more information, contact Altair at www.altair.com.
  4. #
  5. # This file is part of the PBS Professional ("PBS Pro") software.
  6. #
  7. # Open Source License Information:
  8. #
  9. # PBS Pro is free software. You can redistribute it and/or modify it under the
  10. # terms of the GNU Affero General Public License as published by the Free
  11. # Software Foundation, either version 3 of the License, or (at your option) any
  12. # later version.
  13. #
  14. # PBS Pro is distributed in the hope that it will be useful, but WITHOUT ANY
  15. # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
  16. # FOR A PARTICULAR PURPOSE.
  17. # See the GNU Affero General Public License for more details.
  18. #
  19. # You should have received a copy of the GNU Affero General Public License
  20. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  21. #
  22. # Commercial License Information:
  23. #
  24. # For a copy of the commercial license terms and conditions,
  25. # go to: (http://www.pbspro.com/UserArea/agreement.html)
  26. # or contact the Altair Legal Department.
  27. #
  28. # Altair’s dual-license business model allows companies, individuals, and
  29. # organizations to create proprietary derivative works of PBS Pro and
  30. # distribute them - whether embedded or bundled with other software -
  31. # under a commercial license agreement.
  32. #
  33. # Use of Altair’s trademarks, including but not limited to "PBS™",
  34. # "PBS Professional®", and "PBS Pro™" and Altair’s logos is subject to Altair's
  35. # trademark licensing policies.
  36. from tests.functional import *
  37. class Test_power_provisioning_sgi(TestFunctional):
  38. """
  39. Test power provisioning feature for the SGI platform.
  40. Create stub SGI API script at /opt/sgi/ta and load eoe's from it.
  41. """
  42. script = \
  43. """
  44. # Fake SGI API python
  45. import time
  46. def VerifyConnection():
  47. return "connected"
  48. def ListAvailableProfiles():
  49. return ['100W', '150W', '200W', '250W', '300W', '350W', '400W', '450W',
  50. '500W', 'NONE']
  51. def MonitorStart( nodeset_name, profile ):
  52. return None
  53. def MonitorReport( nodeset_name ):
  54. # fake an energy value
  55. fmt = "%Y/%d/%m"
  56. now = time.time()
  57. st = time.strptime(time.strftime(fmt, time.localtime(now)), fmt)
  58. night = time.mktime(st)
  59. return ['total_energy', (now - night)/60000, 1415218704.5979109]
  60. def MonitorStop( nodeset_name ):
  61. return None
  62. def NodesetCreate( nodeset_name, node_hostname_list ):
  63. return None
  64. def NodesetDelete( nodeset_name ):
  65. return None
  66. """
  67. power_nodes = None
  68. def setUp(self):
  69. """
  70. Don't set any special flags.
  71. Use the MOM's that are already setup or define the ones passed in.
  72. """
  73. TestFunctional.setUp(self)
  74. nodes = self.server.status(NODE)
  75. if(self.check_mom_configuration()):
  76. for n in nodes:
  77. host = n['Mom']
  78. if host is None:
  79. continue
  80. # Delete the server side Mom
  81. if host == self.server.shortname:
  82. self.server.manager(MGR_CMD_DELETE, NODE, None, host)
  83. break
  84. # setup environment for power provisioning
  85. self.power_nodes = self.setup_sgi_api(self.script)
  86. if(self.power_nodes == 0):
  87. self.skip_test("No mom found with power profile setup")
  88. else:
  89. # enable power hook
  90. self.enable_power()
  91. for i in range(0, len(self.moms)):
  92. a = {'power_provisioning': 'True'}
  93. self.server.manager(
  94. MGR_CMD_SET, NODE, a, id=self.moms.keys()[i])
  95. else:
  96. self.skip_test("No mom defined on non-server host")
  97. def check_mom_configuration(self):
  98. """
  99. There needs to be at least one Mom that is not running on the
  100. server host.
  101. """
  102. multimom = False
  103. moms = self.server.filter(NODE, 'Mom')
  104. if moms is not None:
  105. for filt in moms.values():
  106. if filt[0] != self.server.shortname:
  107. self.logger.info("found different mom %s from local %s" %
  108. (filt, self.server.shortname))
  109. multimom = True
  110. return True
  111. if not multimom:
  112. return False
  113. else:
  114. self.skip_test(
  115. "No mom found at server/non-server host")
  116. def setup_sgi_api(self, script, perm=0o755):
  117. """
  118. Setup a fake sgi_api script on all the nodes.
  119. Return the number of nodes.
  120. """
  121. fn = self.du.create_temp_file(body=script)
  122. self.du.chmod(path=fn, mode=perm, sudo=True)
  123. done = set()
  124. nodes = self.server.status(NODE)
  125. for n in nodes:
  126. host = n['Mom']
  127. if host is None:
  128. continue
  129. if host in done:
  130. continue
  131. done.add(host)
  132. pwr_dir = os.path.join(os.sep, "opt", "clmgr", "power-service")
  133. dest = os.path.join(pwr_dir, "hpe_clmgr_power_api.py")
  134. self.server.du.run_cmd(host, "mkdir -p " + pwr_dir, sudo=True)
  135. self.server.du.run_copy(host, fn, dest, True)
  136. # Set PBS_PMINAME=sgi in pbs_environment so the power hook
  137. # will use the SGI functionality.
  138. mom = self.moms[host]
  139. if mom is not None:
  140. environ = {"PBS_PMINAME": "sgi"}
  141. self.server.du.set_pbs_environment(host,
  142. environ=environ)
  143. self.server.du.run_cmd(host, "chown root %s" %
  144. os.path.join(mom.pbs_conf[
  145. 'PBS_HOME'],
  146. "pbs_environment"),
  147. sudo=True)
  148. else:
  149. self.skip_test("Need to pass atleast one mom "
  150. "use -p moms=<mom1:mom2>")
  151. os.remove(fn)
  152. return len(nodes)
  153. def revert_sgi_api(self):
  154. """
  155. Remove any fake sgi_api from the nodes.
  156. Return the number of nodes.
  157. """
  158. done = set()
  159. nodes = self.server.status(NODE)
  160. for n in nodes:
  161. host = n['Mom']
  162. if host is None:
  163. continue
  164. if host in done:
  165. continue
  166. done.add(host)
  167. pwr_dir = os.path.join(os.sep, "opt", "clmgr", "power-service")
  168. dest = os.path.join(pwr_dir, "hpe_clmgr_power_api.py")
  169. self.server.du.run_cmd(host, "rm " + dest, sudo=True)
  170. def enable_power(self):
  171. """
  172. Enable power_provisioning on the server.
  173. """
  174. a = {'enabled': 'True'}
  175. hook_name = "PBS_power"
  176. self.server.manager(MGR_CMD_SET, PBS_HOOK, a, id=hook_name,
  177. sudo=True)
  178. done = set() # check that hook becomes active
  179. nodes = self.server.status(NODE)
  180. for n in nodes:
  181. host = n['Mom']
  182. if host is None:
  183. continue
  184. if host in done:
  185. continue
  186. mom = self.moms[host]
  187. s = mom.log_match(
  188. "Hook;PBS_power.HK;copy hook-related file request received",
  189. starttime=self.server.ctime, max_attempts=60)
  190. self.assertTrue(s)
  191. mom.signal("-HUP")
  192. def submit_job(self, secs=10, attr=None):
  193. """
  194. secs: sleep time for the job
  195. a: any job attributes
  196. """
  197. attr['Keep_Files'] = 'oe'
  198. j = Job(TEST_USER, attrs=attr)
  199. j.set_sleep_time(secs)
  200. self.logger.info(str(j))
  201. jid = self.server.submit(j)
  202. return jid
  203. def energy_check(self, jid):
  204. s = self.server.accounting_match("E;%s;.*" % jid,
  205. regexp=True)
  206. self.assertTrue(s is not None)
  207. # got the account record, hack it apart
  208. for resc in s[1].split(';')[3].split():
  209. if resc.partition('=')[0] == "resources_used.energy":
  210. return True
  211. return False
  212. def eoe_check(self, jid, eoe, secs):
  213. # check that job is running and that the vnode has current_eoe set
  214. qstat = self.server.status(JOB, id=jid)
  215. vname = qstat[0]['exec_vnode'].partition(':')[0].strip('(')
  216. self.server.expect(VNODE, {'current_eoe': eoe}, id=vname)
  217. self.server.expect(JOB, 'job_state', op=UNSET, id=jid, offset=secs)
  218. host = qstat[0]['exec_host'].partition('/')[0]
  219. mom = self.moms[host] # top mom
  220. s = mom.log_match(".*;Job;%s;PMI: reset current_eoe.*" % jid,
  221. regexp=True, starttime=self.server.ctime,
  222. max_attempts=10)
  223. self.assertTrue(s)
  224. # check that vnode has current_eoe unset
  225. self.server.expect(VNODE, {'current_eoe': eoe}, id=vname, op=UNSET)
  226. def eoe_job(self, num, eoe):
  227. """
  228. Helper function to submit a job with an eoe value.
  229. Parameters:
  230. num: number of chunks
  231. eoe: profile name
  232. """
  233. secs = 10
  234. jid = self.submit_job(secs,
  235. {'Resource_List.select': '%d:eoe=%s' % (num,
  236. eoe)})
  237. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  238. self.eoe_check(jid, eoe, secs)
  239. return jid
  240. def test_sgi_job(self):
  241. """
  242. Submit jobs with an eoe value and check that messages are logged
  243. indicating PMI activity, and current_eoe and resources_used.energy
  244. get set.
  245. """
  246. # Make sure eoe is set correctly on the vnodes
  247. eoes = set() # use sets to be order independent
  248. nodes = list()
  249. for n in self.server.status(NODE):
  250. name = n['id']
  251. if 'resources_available.eoe' in n:
  252. self.server.manager(MGR_CMD_SET, NODE,
  253. {"power_provisioning": True}, name)
  254. nodes.append(name)
  255. curr = n['resources_available.eoe'].split(',')
  256. self.logger.info("%s has eoe values %s" % (name, str(curr)))
  257. if len(eoes) == 0: # empty set
  258. eoes.update(curr)
  259. else: # all vnodes must have same eoes
  260. self.assertTrue(eoes == set(curr))
  261. self.assertTrue(len(eoes) > 0)
  262. # submit jobs for each eoe value
  263. while len(eoes) > 0:
  264. eoe = eoes.pop()
  265. for x in range(1, len(nodes) + 1):
  266. jid = self.eoe_job(x, eoe)
  267. self.energy_check(jid)
  268. def test_sgi_eoe_job(self):
  269. """
  270. Submit jobs with an eoe values and check that messages are logged
  271. indicating PMI activity, and current_eoe and resources_used.energy
  272. get set.
  273. """
  274. eoes = ['100W', '150W', '450W']
  275. for x in range(1, self.power_nodes + 1):
  276. while len(eoes) > 0:
  277. eoe_profile = eoes.pop()
  278. jid = self.eoe_job(x, eoe_profile)
  279. self.energy_check(jid)
  280. def test_sgi_request_more_power_nodes(self):
  281. """
  282. Submit job with available+1 power nodes and verify job comment.
  283. """
  284. total_nodes = self.power_nodes + 1
  285. jid = self.submit_job(10, {'Resource_List.place': 'scatter',
  286. 'Resource_List.select': '%d:eoe=%s'
  287. % (total_nodes, '150W')})
  288. msg = "Can Never Run: Not enough total nodes available"
  289. self.server.expect(JOB, {'job_state': 'Q', 'comment': msg},
  290. id=jid)
  291. def test_sgi_job_multiple_eoe(self):
  292. """
  293. Submit jobs requesting multiple eoe and job should rejected by qsub.
  294. """
  295. try:
  296. a = {'Resource_List.place': 'scatter',
  297. 'Resource_List.select': '10:eoe=150W+10:eoe=300W'}
  298. self.submit_job(attr=a)
  299. except PbsSubmitError as e:
  300. self.assertTrue(
  301. 'Invalid provisioning request in chunk' in e.msg[0])
  302. def test_sgi_server_prov_off(self):
  303. """
  304. Submit jobs requesting eoe when power provisioning unset on server
  305. and verify that jobs wont run.
  306. """
  307. a = {'enabled': 'False'}
  308. hook_name = "PBS_power"
  309. self.server.manager(MGR_CMD_SET, PBS_HOOK, a, id=hook_name,
  310. sudo=True)
  311. self.server.expect(SERVER, {'power_provisioning': 'False'})
  312. eoes = ['150W', '300W', '450W']
  313. for profile in eoes:
  314. jid = self.submit_job(10,
  315. {'Resource_List.place': 'scatter',
  316. 'Resource_List.select': '%d:eoe=%s'
  317. % (self.power_nodes, profile)})
  318. self.server.expect(JOB, {
  319. 'job_state': 'Q',
  320. 'comment': 'Not Running: No available resources on nodes'},
  321. id=jid)
  322. def test_sgi_node_prov_off(self):
  323. """
  324. Submit jobs requesting eoe and verify that jobs won't run on
  325. nodes where power provisioning is set to false.
  326. """
  327. eoes = ['100W', '250W', '300W', '400W']
  328. # set power_provisioning to off where eoe is set to false
  329. for i in range(0, self.power_nodes):
  330. a = {'power_provisioning': 'False'}
  331. self.server.manager(MGR_CMD_SET, NODE, a, id=self.moms.keys()[i])
  332. for profile in eoes:
  333. jid = self.submit_job(10,
  334. {'Resource_List.place': 'scatter',
  335. 'Resource_List.select': '%d:eoe=%s'
  336. % (self.power_nodes, profile)})
  337. msg = "Not Running: No available resources on nodes"
  338. self.server.expect(JOB, {'job_state': 'Q', 'comment': msg},
  339. id=jid)
  340. def test_sgi_job_preemption(self):
  341. """
  342. Submit job to a high priority queue and verify
  343. that job is preempted by requeueing.
  344. """
  345. for i in range(0, self.power_nodes):
  346. a = {'resources_available.ncpus': 1}
  347. self.server.manager(MGR_CMD_SET, NODE, a, id=self.moms.keys()[i])
  348. self.server.manager(MGR_CMD_CREATE, QUEUE,
  349. {'queue_type': 'execution', 'started': 'True',
  350. 'enabled': 'True', 'priority': 150}, id='workq2')
  351. jid = self.submit_job(30,
  352. {'Resource_List.place': 'scatter',
  353. 'Resource_List.select': '%d:eoe=%s'
  354. % (self.power_nodes, '150W')})
  355. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  356. t = int(time.time())
  357. jid_workq2 = self.submit_job(10, {ATTR_queue: 'workq2',
  358. 'Resource_List.place': 'scatter',
  359. 'Resource_List.select': '%d:eoe=%s' %
  360. (self.power_nodes, '150W')})
  361. self.server.expect(JOB, {'job_state': 'R'}, id=jid_workq2)
  362. self.server.expect(JOB, {'job_state': 'Q'}, id=jid)
  363. self.scheduler.log_match("Job preempted by requeuing", starttime=t)
  364. def tearDown(self):
  365. # remove SGI fake script file
  366. self.revert_sgi_api()
  367. TestFunctional.tearDown(self)