# coding: utf-8 # Copyright (C) 1994-2018 Altair Engineering, Inc. # For more information, contact Altair at www.altair.com. # # This file is part of the PBS Professional ("PBS Pro") software. # # Open Source License Information: # # PBS Pro is free software. You can redistribute it and/or modify it under the # terms of the GNU Affero General Public License as published by the Free # Software Foundation, either version 3 of the License, or (at your option) any # later version. # # PBS Pro is distributed in the hope that it will be useful, but WITHOUT ANY # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. # See the GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # Commercial License Information: # # For a copy of the commercial license terms and conditions, # go to: (http://www.pbspro.com/UserArea/agreement.html) # or contact the Altair Legal Department. # # Altair’s dual-license business model allows companies, individuals, and # organizations to create proprietary derivative works of PBS Pro and # distribute them - whether embedded or bundled with other software - # under a commercial license agreement. # # Use of Altair’s trademarks, including but not limited to "PBS™", # "PBS Professional®", and "PBS Pro™" and Altair’s logos is subject to Altair's # trademark licensing policies. from tests.functional import * class Test_power_provisioning_sgi(TestFunctional): """ Test power provisioning feature for the SGI platform. Create stub SGI API script at /opt/sgi/ta and load eoe's from it. """ script = \ """ # Fake SGI API python import time def VerifyConnection(): return "connected" def ListAvailableProfiles(): return ['100W', '150W', '200W', '250W', '300W', '350W', '400W', '450W', '500W', 'NONE'] def MonitorStart( nodeset_name, profile ): return None def MonitorReport( nodeset_name ): # fake an energy value fmt = "%Y/%d/%m" now = time.time() st = time.strptime(time.strftime(fmt, time.localtime(now)), fmt) night = time.mktime(st) return ['total_energy', (now - night)/60000, 1415218704.5979109] def MonitorStop( nodeset_name ): return None def NodesetCreate( nodeset_name, node_hostname_list ): return None def NodesetDelete( nodeset_name ): return None """ power_nodes = None def setUp(self): """ Don't set any special flags. Use the MOM's that are already setup or define the ones passed in. """ TestFunctional.setUp(self) nodes = self.server.status(NODE) if(self.check_mom_configuration()): for n in nodes: host = n['Mom'] if host is None: continue # Delete the server side Mom if host == self.server.shortname: self.server.manager(MGR_CMD_DELETE, NODE, None, host) break # setup environment for power provisioning self.power_nodes = self.setup_sgi_api(self.script) if(self.power_nodes == 0): self.skip_test("No mom found with power profile setup") else: # enable power hook self.enable_power() for i in range(0, len(self.moms)): a = {'power_provisioning': 'True'} self.server.manager( MGR_CMD_SET, NODE, a, id=self.moms.keys()[i]) else: self.skip_test("No mom defined on non-server host") def check_mom_configuration(self): """ There needs to be at least one Mom that is not running on the server host. """ multimom = False moms = self.server.filter(NODE, 'Mom') if moms is not None: for filt in moms.values(): if filt[0] != self.server.shortname: self.logger.info("found different mom %s from local %s" % (filt, self.server.shortname)) multimom = True return True if not multimom: return False else: self.skip_test( "No mom found at server/non-server host") def setup_sgi_api(self, script, perm=0o755): """ Setup a fake sgi_api script on all the nodes. Return the number of nodes. """ fn = self.du.create_temp_file(body=script) self.du.chmod(path=fn, mode=perm, sudo=True) done = set() nodes = self.server.status(NODE) for n in nodes: host = n['Mom'] if host is None: continue if host in done: continue done.add(host) pwr_dir = os.path.join(os.sep, "opt", "clmgr", "power-service") dest = os.path.join(pwr_dir, "hpe_clmgr_power_api.py") self.server.du.run_cmd(host, "mkdir -p " + pwr_dir, sudo=True) self.server.du.run_copy(host, fn, dest, True) # Set PBS_PMINAME=sgi in pbs_environment so the power hook # will use the SGI functionality. mom = self.moms[host] if mom is not None: environ = {"PBS_PMINAME": "sgi"} self.server.du.set_pbs_environment(host, environ=environ) self.server.du.run_cmd(host, "chown root %s" % os.path.join(mom.pbs_conf[ 'PBS_HOME'], "pbs_environment"), sudo=True) else: self.skip_test("Need to pass atleast one mom " "use -p moms=") os.remove(fn) return len(nodes) def revert_sgi_api(self): """ Remove any fake sgi_api from the nodes. Return the number of nodes. """ done = set() nodes = self.server.status(NODE) for n in nodes: host = n['Mom'] if host is None: continue if host in done: continue done.add(host) pwr_dir = os.path.join(os.sep, "opt", "clmgr", "power-service") dest = os.path.join(pwr_dir, "hpe_clmgr_power_api.py") self.server.du.run_cmd(host, "rm " + dest, sudo=True) def enable_power(self): """ Enable power_provisioning on the server. """ a = {'enabled': 'True'} hook_name = "PBS_power" self.server.manager(MGR_CMD_SET, PBS_HOOK, a, id=hook_name, sudo=True) done = set() # check that hook becomes active nodes = self.server.status(NODE) for n in nodes: host = n['Mom'] if host is None: continue if host in done: continue mom = self.moms[host] s = mom.log_match( "Hook;PBS_power.HK;copy hook-related file request received", starttime=self.server.ctime, max_attempts=60) self.assertTrue(s) mom.signal("-HUP") def submit_job(self, secs=10, attr=None): """ secs: sleep time for the job a: any job attributes """ attr['Keep_Files'] = 'oe' j = Job(TEST_USER, attrs=attr) j.set_sleep_time(secs) self.logger.info(str(j)) jid = self.server.submit(j) return jid def energy_check(self, jid): s = self.server.accounting_match("E;%s;.*" % jid, regexp=True) self.assertTrue(s is not None) # got the account record, hack it apart for resc in s[1].split(';')[3].split(): if resc.partition('=')[0] == "resources_used.energy": return True return False def eoe_check(self, jid, eoe, secs): # check that job is running and that the vnode has current_eoe set qstat = self.server.status(JOB, id=jid) vname = qstat[0]['exec_vnode'].partition(':')[0].strip('(') self.server.expect(VNODE, {'current_eoe': eoe}, id=vname) self.server.expect(JOB, 'job_state', op=UNSET, id=jid, offset=secs) host = qstat[0]['exec_host'].partition('/')[0] mom = self.moms[host] # top mom s = mom.log_match(".*;Job;%s;PMI: reset current_eoe.*" % jid, regexp=True, starttime=self.server.ctime, max_attempts=10) self.assertTrue(s) # check that vnode has current_eoe unset self.server.expect(VNODE, {'current_eoe': eoe}, id=vname, op=UNSET) def eoe_job(self, num, eoe): """ Helper function to submit a job with an eoe value. Parameters: num: number of chunks eoe: profile name """ secs = 10 jid = self.submit_job(secs, {'Resource_List.select': '%d:eoe=%s' % (num, eoe)}) self.server.expect(JOB, {'job_state': 'R'}, id=jid) self.eoe_check(jid, eoe, secs) return jid def test_sgi_job(self): """ Submit jobs with an eoe value and check that messages are logged indicating PMI activity, and current_eoe and resources_used.energy get set. """ # Make sure eoe is set correctly on the vnodes eoes = set() # use sets to be order independent nodes = list() for n in self.server.status(NODE): name = n['id'] if 'resources_available.eoe' in n: self.server.manager(MGR_CMD_SET, NODE, {"power_provisioning": True}, name) nodes.append(name) curr = n['resources_available.eoe'].split(',') self.logger.info("%s has eoe values %s" % (name, str(curr))) if len(eoes) == 0: # empty set eoes.update(curr) else: # all vnodes must have same eoes self.assertTrue(eoes == set(curr)) self.assertTrue(len(eoes) > 0) # submit jobs for each eoe value while len(eoes) > 0: eoe = eoes.pop() for x in range(1, len(nodes) + 1): jid = self.eoe_job(x, eoe) self.energy_check(jid) def test_sgi_eoe_job(self): """ Submit jobs with an eoe values and check that messages are logged indicating PMI activity, and current_eoe and resources_used.energy get set. """ eoes = ['100W', '150W', '450W'] for x in range(1, self.power_nodes + 1): while len(eoes) > 0: eoe_profile = eoes.pop() jid = self.eoe_job(x, eoe_profile) self.energy_check(jid) def test_sgi_request_more_power_nodes(self): """ Submit job with available+1 power nodes and verify job comment. """ total_nodes = self.power_nodes + 1 jid = self.submit_job(10, {'Resource_List.place': 'scatter', 'Resource_List.select': '%d:eoe=%s' % (total_nodes, '150W')}) msg = "Can Never Run: Not enough total nodes available" self.server.expect(JOB, {'job_state': 'Q', 'comment': msg}, id=jid) def test_sgi_job_multiple_eoe(self): """ Submit jobs requesting multiple eoe and job should rejected by qsub. """ try: a = {'Resource_List.place': 'scatter', 'Resource_List.select': '10:eoe=150W+10:eoe=300W'} self.submit_job(attr=a) except PbsSubmitError as e: self.assertTrue( 'Invalid provisioning request in chunk' in e.msg[0]) def test_sgi_server_prov_off(self): """ Submit jobs requesting eoe when power provisioning unset on server and verify that jobs wont run. """ a = {'enabled': 'False'} hook_name = "PBS_power" self.server.manager(MGR_CMD_SET, PBS_HOOK, a, id=hook_name, sudo=True) self.server.expect(SERVER, {'power_provisioning': 'False'}) eoes = ['150W', '300W', '450W'] for profile in eoes: jid = self.submit_job(10, {'Resource_List.place': 'scatter', 'Resource_List.select': '%d:eoe=%s' % (self.power_nodes, profile)}) self.server.expect(JOB, { 'job_state': 'Q', 'comment': 'Not Running: No available resources on nodes'}, id=jid) def test_sgi_node_prov_off(self): """ Submit jobs requesting eoe and verify that jobs won't run on nodes where power provisioning is set to false. """ eoes = ['100W', '250W', '300W', '400W'] # set power_provisioning to off where eoe is set to false for i in range(0, self.power_nodes): a = {'power_provisioning': 'False'} self.server.manager(MGR_CMD_SET, NODE, a, id=self.moms.keys()[i]) for profile in eoes: jid = self.submit_job(10, {'Resource_List.place': 'scatter', 'Resource_List.select': '%d:eoe=%s' % (self.power_nodes, profile)}) msg = "Not Running: No available resources on nodes" self.server.expect(JOB, {'job_state': 'Q', 'comment': msg}, id=jid) def test_sgi_job_preemption(self): """ Submit job to a high priority queue and verify that job is preempted by requeueing. """ for i in range(0, self.power_nodes): a = {'resources_available.ncpus': 1} self.server.manager(MGR_CMD_SET, NODE, a, id=self.moms.keys()[i]) self.server.manager(MGR_CMD_CREATE, QUEUE, {'queue_type': 'execution', 'started': 'True', 'enabled': 'True', 'priority': 150}, id='workq2') jid = self.submit_job(30, {'Resource_List.place': 'scatter', 'Resource_List.select': '%d:eoe=%s' % (self.power_nodes, '150W')}) self.server.expect(JOB, {'job_state': 'R'}, id=jid) t = int(time.time()) jid_workq2 = self.submit_job(10, {ATTR_queue: 'workq2', 'Resource_List.place': 'scatter', 'Resource_List.select': '%d:eoe=%s' % (self.power_nodes, '150W')}) self.server.expect(JOB, {'job_state': 'R'}, id=jid_workq2) self.server.expect(JOB, {'job_state': 'Q'}, id=jid) self.scheduler.log_match("Job preempted by requeuing", starttime=t) def tearDown(self): # remove SGI fake script file self.revert_sgi_api() TestFunctional.tearDown(self)