# coding: utf-8 # Copyright (C) 1994-2018 Altair Engineering, Inc. # For more information, contact Altair at www.altair.com. # # This file is part of the PBS Professional ("PBS Pro") software. # # Open Source License Information: # # PBS Pro is free software. You can redistribute it and/or modify it under the # terms of the GNU Affero General Public License as published by the Free # Software Foundation, either version 3 of the License, or (at your option) any # later version. # # PBS Pro is distributed in the hope that it will be useful, but WITHOUT ANY # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. # See the GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # Commercial License Information: # # For a copy of the commercial license terms and conditions, # go to: (http://www.pbspro.com/UserArea/agreement.html) # or contact the Altair Legal Department. # # Altair’s dual-license business model allows companies, individuals, and # organizations to create proprietary derivative works of PBS Pro and # distribute them - whether embedded or bundled with other software - # under a commercial license agreement. # # Use of Altair’s trademarks, including but not limited to "PBS™", # "PBS Professional®", and "PBS Pro™" and Altair’s logos is subject to Altair's # trademark licensing policies. from tests.functional import * class TestNodeBuckets(TestFunctional): """ Test basic functionality of node buckets. """ def setUp(self): TestFunctional.setUp(self) day = time.strftime("%Y%m%d", time.localtime(time.time())) filename = os.path.join(self.server.pbs_conf['PBS_HOME'], 'sched_logs', day) self.du.rm(path=filename, force=True, sudo=True, level=logging.DEBUG2) self.colors = \ ['red', 'orange', 'yellow', 'green', 'blue', 'indigo', 'violet'] self.shapes = ['circle', 'square', 'triangle', 'diamond', 'pyramid', 'sphere', 'cube'] self.letters = ['A', 'B', 'C', 'D', 'E', 'F', 'G'] self.server.manager(MGR_CMD_CREATE, RSC, {'type': 'string', 'flag': 'h'}, id='color') self.server.manager(MGR_CMD_CREATE, RSC, {'type': 'string_array', 'flag': 'h'}, id='shape') self.server.manager(MGR_CMD_CREATE, RSC, {'type': 'string_array', 'flag': 'h'}, id='letter') self.server.manager(MGR_CMD_CREATE, RSC, {'type': 'boolean', 'flag': 'h'}, id='bool') a = {'resources_available.ncpus': 2, 'resources_available.mem': '8gb'} # 10010 nodes since it divides into 7 evenly. # Each node bucket will have 1430 nodes in it self.server.create_vnodes(name='vnode', attrib=a, num=10010, mom=self.mom, sharednode=False, expect=False, attrfunc=self.cust_attr_func) # Make sure all the nodes are in state free. We can't let # create_vnodes() do this because it does a pbsnodes -v on each vnode. # This takes a long time. self.server.expect(NODE, {'state=free': (GE, 10010)}) self.scheduler.add_resource('color') self.scheduler.set_sched_config({'log_filter': '2048'}) def cust_attr_func(self, name, totalnodes, numnode, attribs): """ Add resources to vnodes. There are 10010 nodes, which means 1430 nodes of each color, letter, and shape. The value of bool is True for the last 5005 nodes and unset for the first 5005 nodes """ a = {'resources_available.color': self.colors[numnode / 1430], 'resources_available.shape': self.shapes[numnode % 7], 'resources_available.letter': self.letters[numnode % 7]} if numnode / 5005 == 0: a['resources_available.bool'] = 'True' # Yellow buckets get a higher priority if numnode / 1430 == 2: a['Priority'] = 100 return dict(attribs.items() + a.items()) def check_normal_path(self, sel='2:ncpus=2:mem=1gb', pl='scatter:excl', queue='workq'): """ Check if a job runs in the normal code path """ a = {'Resource_List.select': sel, 'Resource_List.place': pl, 'queue': queue} j = Job(TEST_USER, attrs=a) jid = self.server.submit(j) self.scheduler.log_match(jid + ';Evaluating subchunk', n=10000) self.server.delete(jid, wait=True) @timeout(450) def test_basic(self): """ Request nodes of a specific color and make sure they are correctly allocated to the job """ chunk = '4:ncpus=1:color=yellow' a = {'Resource_List.select': chunk, 'Resource_List.place': 'scatter:excl'} J = Job(TEST_USER, a) jid = self.server.submit(J) self.server.expect(JOB, {'job_state': 'R'}, id=jid) self.scheduler.log_match(jid + ';Chunk: ' + chunk, n=10000) js = self.server.status(JOB, id=jid) nodes = J.get_vnodes(js[0]['exec_vnode']) for node in nodes: n = self.server.status(NODE, 'resources_available.color', id=node) self.assertTrue('yellow' in n[0]['resources_available.color']) @timeout(450) def test_multi_bucket(self): """ Request two different chunk types which need to be allocated from different buckets and make sure they are allocated correctly. """ a = {'Resource_List.select': '4:ncpus=1:color=yellow+4:ncpus=1:color=blue', 'Resource_List.place': 'scatter:excl'} J = Job(TEST_USER, a) jid = self.server.submit(J) self.server.expect(JOB, {'job_state': 'R'}, id=jid) self.scheduler.log_match(jid + ';Chunk: ', n=10000) js = self.server.status(JOB, id=jid) nodes = J.get_vnodes(js[0]['exec_vnode']) # Yellow nodes were requested first. # Make sure they come before the blue nodes. for i in range(4): n = self.server.status(NODE, id=nodes[i]) self.assertTrue('yellow' in n[0]['resources_available.color']) for i in range(4, 8): n = self.server.status(NODE, id=nodes[i]) self.assertTrue('blue' in n[0]['resources_available.color']) @timeout(450) def test_multi_bucket2(self): """ Request nodes from all 7 different buckets and see them allocated correctly """ select = "" for c in self.colors: select += "1:ncpus=1:color=%s+" % (c) # remove the trailing '+' select = select[:-1] a = {'Resource_List.select': select, 'Resource_List.place': 'scatter:excl'} J = Job(TEST_USER, a) jid = self.server.submit(J) self.server.expect(JOB, {'job_state': 'R'}, id=jid) self.scheduler.log_match(jid + ';Chunk:', n=10000) js = self.server.status(JOB, id=jid) nodes = J.get_vnodes(js[0]['exec_vnode']) for i, node in enumerate(nodes): n = self.server.status(NODE, id=node) self.assertTrue(self.colors[i] in n[0]['resources_available.color']) @timeout(450) def test_not_run(self): """ Request more nodes of one color that is available to make sure the job is not run on incorrect nodes. """ chunk = '1431:ncpus=1:color=yellow' a = {'Resource_List.select': chunk, 'Resource_List.place': 'scatter:excl'} J = Job(TEST_USER, a) jid = self.server.submit(J) a = {'comment': (MATCH_RE, '^Can Never Run'), 'job_state': 'Q'} self.server.expect(JOB, a, attrop=PTL_AND, id=jid) self.scheduler.log_match(jid + ';Chunk: ' + chunk, n=10000) @timeout(450) def test_calendaring1(self): """ Test to see that nodes that are used in the future for calendared jobs are not used for filler jobs that would distrupt the scheduled time. """ self.scheduler.set_sched_config({'strict_ordering': 'True'}) chunk1 = '1:ncpus=1' a = {'Resource_List.select': chunk1, 'Resource_List.place': 'scatter:excl', 'Resource_List.walltime': '1:00:00'} j = Job(TEST_USER, attrs=a) jid1 = self.server.submit(j) self.server.expect(JOB, {'job_state': 'R'}, id=jid1) self.scheduler.log_match(jid1 + ';Chunk: ' + chunk1, n=10000) chunk2 = '10010:ncpus=1' a = {'Resource_List.select': chunk2, 'Resource_List.place': 'scatter:excl', 'Resource_List.walltime': '2:00:00'} j = Job(TEST_USER, attrs=a) jid2 = self.server.submit(j) self.server.expect(JOB, 'comment', op=SET, id=jid2) self.server.expect(JOB, {'job_state': 'Q'}, id=jid2) self.scheduler.log_match(jid2 + ';Chunk: ' + chunk2, n=10000) chunk3 = '2:ncpus=1' a = {'Resource_List.select': chunk3, 'Resource_List.place': 'scatter:excl', 'Resource_List.walltime': '30:00'} j = Job(TEST_USER, attrs=a) jid3 = self.server.submit(j) self.server.expect(JOB, {'job_state': 'R'}, id=jid3) self.scheduler.log_match(jid3 + ';Chunk: ' + chunk3, n=10000) a = {'Resource_List.select': chunk3, 'Resource_List.place': 'scatter:excl', 'Resource_List.walltime': '2:30:00'} j = Job(TEST_USER, attrs=a) jid4 = self.server.submit(j) self.server.expect(JOB, 'comment', op=SET, id=jid4) self.server.expect(JOB, {'job_state': 'Q'}, id=jid4) self.scheduler.log_match(jid4 + ';Chunk: ' + chunk3, n=10000) @timeout(450) def test_calendaring2(self): """ Test that nodes that a reservation calendared on them later on are used before totally free nodes """ self.scheduler.set_sched_config({'strict_ordering': 'True'}) now = int(time.time()) a = {'Resource_List.select': '1:vnode=vnode[2865]+1:vnode=vnode[2870]', 'Resource_List.place': 'scatter:excl', 'Resource_List.walltime': '1:00:00', 'reserve_start': now + 3600, 'reserve_end': now + 7200} r = Reservation(TEST_USER, attrs=a) rid = self.server.submit(r) self.server.expect(RESV, {'reserve_state': (MATCH_RE, 'RESV_CONFIRMED|2')}, id=rid) chunk = '2:ncpus=1:color=yellow' a = {'Resource_List.select': chunk, 'Resource_List.place': 'scatter:excl', 'Resource_List.walltime': '30:00'} j = Job(TEST_USER, attrs=a) jid = self.server.submit(j) self.server.expect(JOB, {'job_state': 'R'}, id=jid) self.scheduler.log_match(jid + ';Chunk: ' + chunk, n=10000) s = self.server.status(JOB, 'exec_vnode', id=jid) n = j.get_vnodes(s[0]['exec_vnode']) msg = 'busy_later nodes not chosen first' self.assertTrue('vnode[2865]' in n, msg) self.assertTrue('vnode[2870]' in n, msg) @timeout(450) def test_calendaring3(self): """ Test that a future reservation's nodes are used first for a job that is put into the calendar. """ self.scheduler.set_sched_config({'strict_ordering': 'True'}) now = int(time.time()) a = {'Resource_List.select': '1:vnode=vnode[2865]+1:vnode=vnode[2870]', 'Resource_List.place': 'scatter:excl', 'Resource_List.walltime': '1:00:00', 'reserve_start': now + 3600, 'reserve_end': now + 7200} r = Reservation(TEST_USER, attrs=a) rid = self.server.submit(r) self.server.expect(RESV, {'reserve_state': (MATCH_RE, 'RESV_CONFIRMED|2')}, id=rid) chunk1 = '1430:ncpus=1:color=yellow' a = {'Resource_List.select': chunk1, 'Resource_List.place': 'scatter:excl', 'Resource_List.walltime': '30:00'} j = Job(TEST_USER, attrs=a) jid = self.server.submit(j) self.server.expect(JOB, {'job_state': 'R'}, id=jid) self.scheduler.log_match(jid + ';Chunk: ' + chunk1, n=10000) chunk2 = '2:ncpus=1:color=yellow' a = {'Resource_List.select': chunk2, 'Resource_List.place': 'scatter:excl', 'Resource_List.walltime': '15:00'} j2 = Job(TEST_USER, attrs=a) jid2 = self.server.submit(j2) self.scheduler.log_match(jid2 + ';Chunk: ' + chunk2, n=10000) self.server.expect(JOB, 'estimated.exec_vnode', op=SET, id=jid2) s = self.server.status(JOB, 'estimated.exec_vnode', id=jid2) n = j2.get_vnodes(s[0]['estimated.exec_vnode']) msg = 'busy_later nodes not chosen first' self.assertTrue('vnode[2865]' in n, msg) self.assertTrue('vnode[2870]' in n, msg) @timeout(450) def test_buckets_and_non(self): """ Test that jobs requesting buckets and not requesting buckets play nice together """ # vnode[1435] is orange a = {'Resource_List.ncpus': 1, 'Resource_List.vnode': 'vnode[1435]'} j1 = Job(TEST_USER, attrs=a) jid1 = self.server.submit(j1) self.server.expect(JOB, {'job_state': 'R'}, id=jid1) self.scheduler.log_match(jid1 + ';Evaluating subchunk', n=10000) chunk = '1429:ncpus=1:color=orange' a = {'Resource_List.select': chunk, 'Resource_List.place': 'scatter:excl'} j2 = Job(TEST_USER, attrs=a) jid2 = self.server.submit(j2) self.server.expect(JOB, {'job_state': 'R'}, id=jid2) self.scheduler.log_match(jid2 + ';Chunk: ' + chunk, n=10000) s1 = self.server.status(JOB, 'exec_vnode', id=jid1) s2 = self.server.status(JOB, 'exec_vnode', id=jid2) nodes1 = j1.get_vnodes(s1[0]['exec_vnode']) nodes2 = j2.get_vnodes(s2[0]['exec_vnode']) msg = 'Job 1 and Job 2 are sharing nodes' for n in nodes2: self.assertNotEqual(n, nodes1[0], msg) @timeout(600) def test_not_buckets(self): """ Test to make sure the jobs that should use the standard node searching code path do not use the bucket code path """ # Running a 10010 cpu job through the normal code path spams the log. # We don't care about it, so there is no reason to increase # the log size by so much. self.scheduler.set_sched_config({'log_filter': '3328'}) # Run a job on all nodes leaving 1 cpus available on each node j = Job(TEST_USER, {'Resource_List.select': '10010:ncpus=1', 'Resource_List.place': 'scatter'}) j.set_sleep_time(600) jid = self.server.submit(j) self.server.expect(JOB, {'job_state': 'R'}, id=jid) self.scheduler.set_sched_config({'log_filter': '2048'}) # Node sorting via unused resources uses the standard code path self.logger.info('Test node_sort_key with unused resources') a = {'node_sort_key': '\"ncpus HIGH unused\"'} self.scheduler.set_sched_config(a) self.check_normal_path() self.scheduler.revert_to_defaults() schd_attr = {'log_filter': '2048'} self.scheduler.set_sched_config(schd_attr) # provisioning_policy: avoid_provisioning uses the standard code path self.logger.info('Test avoid_provision') a = {'provision_policy': 'avoid_provision'} self.scheduler.set_sched_config(a) self.check_normal_path() self.scheduler.revert_to_defaults() self.scheduler.add_resource('color') self.scheduler.set_sched_config(schd_attr) # the bucket codepath requires excl self.logger.info('Test different place specs') self.check_normal_path(pl='scatter:shared') self.check_normal_path(pl='free') # can't request host or vnode resources on the bucket codepath self.logger.info('Test jobs requesting host and vnode') self.check_normal_path(sel='1:ncpus=2:host=vnode[0]') self.check_normal_path(sel='1:ncpus=2:vnode=vnode[0]') # suspended jobs use the normal codepath self.logger.info('Test suspended job') a = {'queue_type': 'execution', 'started': 'True', 'enabled': 'True', 'priority': 200} self.server.manager(MGR_CMD_CREATE, QUEUE, a, id='expressq') self.server.delete(jid, wait=True) a = {'Resource_List.select': '1430:ncpus=1:color=orange', 'Resource_List.place': 'scatter:excl'} j2 = Job(TEST_USER, a) jid2 = self.server.submit(j2) self.server.expect(JOB, {'job_state': 'R'}, id=jid2) a = {'Resource_List.select': '1:ncpus=1:color=orange', 'queue': 'expressq'} j3 = Job(TEST_USER, a) jid3 = self.server.submit(j3) self.server.expect(JOB, {'job_state': 'S'}, id=jid2) self.server.expect(JOB, {'job_state': 'R'}, id=jid3) self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'True'}) self.scheduler.log_match(jid3 + ';Evaluating subchunk', n=10000) self.server.delete([jid2, jid3], wait=True) # Checkpointed jobs use normal code path self.logger.info('Test checkpointed job') chk_script = """#!/bin/bash kill $1 exit 0 """ self.chk_file = self.du.create_temp_file(body=chk_script) self.du.chmod(path=self.chk_file, mode=0o755) self.du.chown(path=self.chk_file, uid=0, gid=0, sudo=True) c = {'$action': 'checkpoint_abort 30 !' + self.chk_file + ' %sid'} self.mom.add_config(c) self.scheduler.set_sched_config({'preempt_order': 'C'}) attrs = {'Resource_List.select': '1430:ncpus=1:color=orange', 'Resource_List.place': 'scatter:excl'} j_c1 = Job(TEST_USER, attrs) jid_c1 = self.server.submit(j_c1) self.server.expect(JOB, {'job_state': 'R'}, id=jid_c1) self.scheduler.log_match( jid_c1 + ';Chunk: 1430:ncpus=1:color=orange', n=10000) a = {'Resource_List.select': '1:ncpus=1:color=orange', 'queue': 'expressq'} j_c2 = Job(TEST_USER, a) jid_c2 = self.server.submit(j_c2) self.server.expect(JOB, {'job_state': 'Q'}, id=jid_c1) self.server.expect(JOB, {'job_state': 'R'}, id=jid_c2) self.scheduler.log_match( jid_c1 + ";Job preempted by checkpointing", n=10000) self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'True'}) self.scheduler.log_match(jid_c2 + ';Evaluating subchunk', n=10000) self.server.delete([jid_c1, jid_c2], wait=True) # Job's in reservations use the standard codepath self.logger.info('Test job in reservation') now = int(time.time()) a = {'Resource_List.select': '4:ncpus=2:mem=4gb', 'Resource_List.place': 'scatter:excl', 'reserve_start': now + 30, 'reserve_end': now + 120} r = Reservation(TEST_USER, a) rid = self.server.submit(r) self.server.expect(RESV, {'reserve_state': (MATCH_RE, 'RESV_CONFIRMED|2')}) self.logger.info('Waiting 30s for reservation to start') self.server.expect(RESV, {'reserve_state': (MATCH_RE, 'RESV_RUNNING|5')}, offset=30) r_queue = rid.split('.')[0] self.check_normal_path(sel='1:ncpus=3', queue=r_queue) self.server.delete(rid) # Jobs on multi-vnoded systems use the standard codepath self.logger.info('Test job on multi-vnoded system') a = {'resources_available.ncpus': 2, 'resources_available.mem': '8gb'} self.server.create_vnodes('vnode', a, 8, self.mom, sharednode=False, vnodes_per_host=4) self.check_normal_path(sel='2:ncpus=8') @timeout(450) def test_multi_vnode_resv(self): """ Test that node buckets do not get in the way of running jobs on multi-vnoded systems in reservations """ a = {'resources_available.ncpus': 2, 'resources_available.mem': '8gb'} self.server.create_vnodes('vnode', a, 12, self.mom, sharednode=False, vnodes_per_host=4, attrfunc=self.cust_attr_func) now = int(time.time()) a = {'Resource_List.select': '8:ncpus=1', 'Resource_List.place': 'vscatter', 'reserve_start': now + 30, 'reserve_end': now + 3600} r = Reservation(TEST_USER, attrs=a) rid = self.server.submit(r) a = {'reserve_state': (MATCH_RE, 'RESV_CONFIRMED|2')} self.server.expect(RESV, a, id=rid) self.logger.info('Waiting 30s for reservation to start') a['reserve_state'] = (MATCH_RE, 'RESV_RUNNING|5') self.server.expect(RESV, a, id=rid, offset=30) a = {'Resource_List.select': '2:ncpus=1', 'Resource_List.place': 'group=shape', 'queue': rid.split('.')[0]} j = Job(TEST_USER, attrs=a) jid = self.server.submit(j) self.server.expect(JOB, {'job_state': 'R'}, id=jid) self.scheduler.log_match(jid + ';Evaluating subchunk', n=10000) ev = self.server.status(JOB, 'exec_vnode', id=jid) used_nodes = j.get_vnodes(ev[0]['exec_vnode']) n = self.server.status(NODE, 'resources_available.shape') s = [x['resources_available.shape'] for x in n if x['id'] in used_nodes] self.assertEqual(len(set(s)), 1, "Job1 ran in more than one placement set") @timeout(450) def test_bucket_sort(self): """ Test if buckets are sorted properly: all of the yellow bucket also has priority 100. It should be the first bucket. """ a = {'node_sort_key': '\"sort_priority HIGH\"'} self.scheduler.set_sched_config(a) chunk = '2:ncpus=1' j = Job(TEST_USER, {'Resource_List.select': chunk, 'Resource_List.place': 'scatter:excl'}) jid = self.server.submit(j) self.server.expect(JOB, {'job_state': 'R'}, id=jid) self.scheduler.log_match(jid + ';Chunk: ' + chunk, n=10000) jobs = self.server.status(JOB, {'exec_vnode'}) jn = j.get_vnodes(jobs[0]['exec_vnode']) n1 = self.server.status(NODE, 'resources_available.color', id=jn[0]) n2 = self.server.status(NODE, 'resources_available.color', id=jn[1]) c1 = n1[0]['resources_available.color'] c2 = n2[0]['resources_available.color'] self.assertEquals(c1, 'yellow', "Job didn't run on yellow nodes") self.assertEquals(c2, 'yellow', "Job didn't run on yellow nodes") @timeout(450) def test_psets(self): """ Test placement sets with node buckets """ a = {'node_group_key': 'shape', 'node_group_enable': 'True', 'scheduling': 'False'} self.server.manager(MGR_CMD_SET, SERVER, a, expect=True) chunk = '1430:ncpus=1' a = {'Resource_List.select': chunk, 'Resource_List.place': 'scatter:excl'} j1 = Job(TEST_USER, attrs=a) jid1 = self.server.submit(j1) j2 = Job(TEST_USER, attrs=a) jid2 = self.server.submit(j2) self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'True'}) self.server.expect(JOB, {'job_state': 'R'}, id=jid1) self.server.expect(JOB, {'job_state': 'R'}, id=jid2) self.scheduler.log_match(jid1 + ';Chunk: ' + chunk, n=10000) self.scheduler.log_match(jid2 + ';Chunk: ' + chunk, n=10000) ev = self.server.status(JOB, 'exec_vnode', id=jid1) used_nodes1 = j1.get_vnodes(ev[0]['exec_vnode']) n = self.server.status(NODE, 'resources_available.shape') s = [x['resources_available.shape'] for x in n if x['id'] in used_nodes1] self.assertEqual(len(set(s)), 1, "Job1 ran in more than one placement set") ev = self.server.status(JOB, 'exec_vnode', id=jid2) used_nodes2 = j2.get_vnodes(ev[0]['exec_vnode']) s = [x['resources_available.shape'] for x in n if x['id'] in used_nodes2] self.assertEqual(len(set(s)), 1, "Job2 ran in more than one placement set") for node in used_nodes1: self.assertNotIn(node, used_nodes2, 'Jobs share nodes: ' + node) @timeout(450) def test_psets_calendaring(self): """ Test that jobs in the calendar fit within a placement set """ self.scheduler.set_sched_config({'strict_ordering': 'True'}) self.server.manager(MGR_CMD_SET, SERVER, {'backfill_depth': 5}) chunk1 = '10010:ncpus=1' a = {'Resource_List.select': chunk1, 'Resource_List.place': 'scatter:excl', 'Resource_List.walltime': '1:00:00'} j1 = Job(TEST_USER, attrs=a) jid1 = self.server.submit(j1) self.server.expect(JOB, {'job_state': 'R'}, id=jid1) self.scheduler.log_match(jid1 + ';Chunk: ' + chunk1, n=10000) svr_attr = {'node_group_key': 'shape', 'node_group_enable': 'True'} self.server.manager(MGR_CMD_SET, SERVER, svr_attr) chunk2 = '1430:ncpus=1' a['Resource_List.select'] = chunk2 j2 = Job(TEST_USER, a) jid2 = self.server.submit(j2) self.scheduler.log_match(jid2 + ';Chunk: ' + chunk2, n=10000) self.scheduler.log_match(jid2 + ';Job is a top job', n=10000) n = self.server.status(NODE, 'resources_available.shape') ev = self.server.status(JOB, 'estimated.exec_vnode', id=jid2) used_nodes2 = j2.get_vnodes(ev[0]['estimated.exec_vnode']) s = [x['resources_available.shape'] for x in n if x['id'] in used_nodes2] self.assertEqual(len(set(s)), 1, "Job1 will run in more than one placement set") j3 = Job(TEST_USER, a) jid3 = self.server.submit(j3) self.scheduler.log_match(jid3 + ';Chunk: ' + chunk2, n=10000) self.scheduler.log_match(jid3 + ';Job is a top job', n=10000) ev = self.server.status(JOB, 'estimated.exec_vnode', id=jid3) used_nodes3 = j3.get_vnodes(ev[0]['estimated.exec_vnode']) s = [x['resources_available.shape'] for x in n if x['id'] in used_nodes3] self.assertEqual(len(set(s)), 1, "Job1 will run in more than one placement set") for node in used_nodes2: self.assertNotIn(node, used_nodes3, 'Jobs will share nodes: ' + node) @timeout(450) def test_place_group(self): """ Test node buckets with place=group """ chunk = '1430:ncpus=1' a = {'Resource_List.select': chunk, 'Resource_List.place': 'scatter:excl:group=letter'} j = Job(TEST_USER, attrs=a) jid = self.server.submit(j) self.server.expect(JOB, {'job_state': 'R'}, id=jid) self.scheduler.log_match(jid + ';Chunk: ' + chunk, n=10000) ev = self.server.status(JOB, 'exec_vnode', id=jid) used_nodes = j.get_vnodes(ev[0]['exec_vnode']) n = self.server.status(NODE, 'resources_available.letter') s = [x['resources_available.letter'] for x in n if x['id'] in used_nodes] self.assertEqual(len(set(s)), 1, "Job ran in more than one placement set") @timeout(450) def test_psets_spanning(self): """ Request more nodes than available in one placement set and see the job span or not depending on the value of do_not_span_psets """ a = {'node_group_key': 'shape', 'node_group_enable': 'True'} self.server.manager(MGR_CMD_SET, SERVER, a) a = {'do_not_span_psets': 'True'} self.server.manager(MGR_CMD_SET, SCHED, a, id='default') # request one more node than the largest placement set chunk = '1431:ncpus=1' a = {'Resource_List.select': chunk, 'Resource_List.place': 'scatter:excl'} j = Job(TEST_USER, attrs=a) jid = self.server.submit(j) a = {'job_state': 'Q', 'comment': (MATCH_RE, 'can\'t fit in the largest placement set, ' 'and can\'t span psets')} self.server.expect(JOB, a, attrop=PTL_AND, id=jid) self.scheduler.log_match(jid + ';Chunk: ' + chunk, n=10000) a = {'do_not_span_psets': 'False'} self.server.manager(MGR_CMD_SET, SCHED, a, id='default') self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'True'}) self.server.expect(JOB, {'job_state': 'R'}, id=jid) ev = self.server.status(JOB, 'exec_vnode', id=jid) used_nodes = j.get_vnodes(ev[0]['exec_vnode']) n = self.server.status(NODE, 'resources_available.shape') s = [x['resources_available.shape'] for x in n if x['id'] in used_nodes] self.assertGreater(len(set(s)), 1, "Job did not span properly") @timeout(450) def test_psets_queue(self): """ Test that placement sets work for nodes associated with queues """ a = {'node_group_key': 'shape', 'node_group_enable': 'True'} self.server.manager(MGR_CMD_SET, SERVER, a) a = {'queue_type': 'Execution', 'started': 'True', 'enabled': 'True'} self.server.manager(MGR_CMD_CREATE, QUEUE, a, id='workq2') # Take the first 14 vnodes. This means there are two nodes per shape nodes = ['vnode[0]', 'vnode[1]', 'vnode[2]', 'vnode[3]', 'vnode[4]', 'vnode[5]', 'vnode[6]', 'vnode[7]', 'vnode[8]', 'vnode[9]', 'vnode[10]', 'vnode[11]', 'vnode[12]', 'vnode[13]'] self.server.manager(MGR_CMD_SET, NODE, {'queue': 'workq2'}, id=nodes) chunk = '2:ncpus=1' a = {'Resource_List.select': chunk, 'queue': 'workq2', 'Resource_List.place': 'scatter:excl'} for _ in range(7): j = Job(TEST_USER, a) jid = self.server.submit(j) self.server.expect(JOB, {'job_state': 'R'}, id=jid) self.scheduler.log_match(jid + ';Chunk: ' + chunk, n=10000) # Check to see if jobs ran in one placement set jobs = self.server.status(JOB) for job in jobs: ev = self.server.status(JOB, 'exec_vnode', id=job['id']) used_nodes = j.get_vnodes(ev[0]['exec_vnode']) n = self.server.status(NODE, 'resources_available.shape') s = [x['resources_available.shape'] for x in n if x['id'] in used_nodes] self.assertEqual(len(set(s)), 1, "Job " + job['id'] + "ran in more than one placement set") s = self.server.select() for jid in s: self.server.delete(jid, wait=True) # Check to see of jobs span correctly chunk = '7:ncpus=1' a = {'Resource_List.select': chunk, 'queue': 'workq2', 'Resource_List.place': 'scatter:excl'} j = Job(TEST_USER, a) jid = self.server.submit(j) self.server.expect(JOB, {'job_state': 'R'}, id=jid) self.scheduler.log_match(jid + ';Chunk: ' + chunk, n=10000) ev = self.server.status(JOB, 'exec_vnode', id=jid) used_nodes = j.get_vnodes(ev[0]['exec_vnode']) n = self.server.status(NODE, 'resources_available.shape') s = [x['resources_available.shape'] for x in n if x['id'] in used_nodes] self.assertGreater(len(set(s)), 1, "Job did not span properly") @timeout(450) def test_free(self): """ Test that free placement works with the bucket code path """ self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'False'}) chunk = '1430:ncpus=1:color=yellow' a = {'Resource_List.select': chunk, 'Resource_List.place': 'excl'} j1 = Job(TEST_USER, attrs=a) jid1 = self.server.submit(j1) j2 = Job(TEST_USER, attrs=a) jid2 = self.server.submit(j2) self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'True'}) self.server.expect(JOB, {'job_state': 'R'}, id=jid1) self.server.expect(JOB, {'job_state': 'R'}, id=jid2) self.scheduler.log_match(jid1 + ';Chunk: ' + chunk, n=10000) self.scheduler.log_match(jid2 + ';Chunk: ' + chunk, n=10000) s1 = self.server.status(JOB, 'exec_vnode', id=jid1) s2 = self.server.status(JOB, 'exec_vnode', id=jid2) n1 = j1.get_vnodes(s1[0]['exec_vnode']) n2 = j1.get_vnodes(s2[0]['exec_vnode']) msg = 'job did not run on correct number of nodes' self.assertEquals(len(n1), 715, msg) self.assertEquals(len(n2), 715, msg) for node in n1: self.assertTrue(node not in n2, 'Jobs share nodes: ' + node) @timeout(450) def test_queue_nodes(self): """ Test that buckets work with nodes associated to a queue """ v1 = 'vnode[1431]' v2 = 'vnode[1435]' a = {'queue_type': 'execution', 'started': 'True', 'enabled': 'True'} self.server.manager(MGR_CMD_CREATE, QUEUE, a, id='q2') self.server.manager(MGR_CMD_SET, NODE, {'queue': 'q2'}, id=v1) self.server.manager(MGR_CMD_SET, NODE, {'queue': 'q2'}, id=v2) chunk1 = '1428:ncpus=1:color=orange' a = {'Resource_List.select': chunk1, 'Resource_List.place': 'scatter:excl'} j1 = Job(TEST_USER, attrs=a) jid1 = self.server.submit(j1) self.server.expect(JOB, {'job_state': 'R'}, id=jid1) self.scheduler.log_match(jid1 + ';Chunk: ' + chunk1, n=10000) job = self.server.status(JOB, 'exec_vnode', id=jid1)[0] ev = j1.get_vnodes(job['exec_vnode']) msg = 'Job is using queue\'s nodes' self.assertNotIn(v1, ev) self.assertNotIn(v2, ev) chunk2 = '2:ncpus=1' a = {'Resource_List.select': chunk2, 'Resource_List.place': 'scatter:excl', 'queue': 'q2'} j2 = Job(TEST_USER, attrs=a) jid2 = self.server.submit(j2) self.server.expect(JOB, {'job_state': 'R'}, id=jid2) self.scheduler.log_match(jid2 + ';Chunk: ' + chunk2, n=10000) job = self.server.status(JOB, 'exec_vnode', id=jid2)[0] ev = j2.get_vnodes(job['exec_vnode']) msg = 'Job running on nodes not associated with queue' self.assertIn(v1, ev, msg) self.assertIn(v2, ev, msg) @timeout(450) def test_booleans(self): """ Test that booleans are correctly handled if not in the sched_config resources line. This means that an unset boolean is considered false and that booleans that are True are considered even though they aren't on the resources line. """ chunk1 = '2:ncpus=1' a = {'Resource_List.select': chunk1, 'Resource_List.place': 'scatter:excl'} j1 = Job(TEST_USER, attrs=a) jid1 = self.server.submit(j1) self.server.expect(JOB, {'job_state': 'R'}, id=jid1) self.scheduler.log_match(jid1 + ';Chunk: ' + chunk1, n=10000) jst = self.server.status(JOB, 'exec_vnode', id=jid1)[0] ev = j1.get_vnodes(jst['exec_vnode']) for n in ev: self.server.expect( NODE, {'resources_available.bool': 'True'}, id=n) chunk2 = '2:ncpus=1:bool=False' a['Resource_List.select'] = chunk2 j2 = Job(TEST_USER, attrs=a) jid2 = self.server.submit(j2) self.server.expect(JOB, {'job_state': 'R'}, id=jid2) self.scheduler.log_match(jid2 + ';Chunk: ' + chunk2, n=10000) jst = self.server.status(JOB, 'exec_vnode', id=jid2)[0] ev = j2.get_vnodes(jst['exec_vnode']) for n in ev: self.server.expect( NODE, 'resources_available.bool', op=UNSET, id=n) @timeout(450) def test_last_pset_can_never_run(self): """ Test that the job does not retain the error value of last placement set seen by the node bucketing code. To check this make sure that the last placement set check results into a 'can never run' case because resources do not match and check that the job is not marked as never run. """ self.server.manager(MGR_CMD_CREATE, RSC, {'type': 'long', 'flag': 'nh'}, id='foo') self.server.manager(MGR_CMD_CREATE, RSC, {'type': 'string', 'flag': 'h'}, id='bar') self.server.manager(MGR_CMD_SET, SERVER, {'node_group_key': 'bar'}) self.server.manager(MGR_CMD_SET, SERVER, {'node_group_enable': 'true'}) self.mom.delete_vnode_defs() a = {'resources_available.ncpus': 80, 'resources_available.bar': 'large'} self.server.create_vnodes(name='vnode', attrib=a, num=8, mom=self.mom, sharednode=False) self.scheduler.add_resource('foo') a['resources_available.foo'] = 8 a['resources_available.ncpus'] = 8 a['resources_available.bar'] = 'small' for val in range(0, 5): vname = "vnode[" + str(val) + "]" self.server.manager(MGR_CMD_SET, NODE, a, id=vname) chunk1 = '4:ncpus=5:foo=5' a = {'Resource_List.select': chunk1, 'Resource_List.place': 'scatter:excl'} j1 = Job(TEST_USER, attrs=a) jid1 = self.server.submit(j1) self.server.expect(JOB, {'job_state': 'R'}, id=jid1) j2 = Job(TEST_USER, attrs=a) jid2 = self.server.submit(j2) self.server.expect(JOB, {'job_state': 'Q'}, id=jid2) self.scheduler.log_match(jid2 + ';Job will never run', existence=False, max_attempts=10)