pbs_smoketest.py 61 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520
  1. # coding: utf-8
  2. # Copyright (C) 1994-2018 Altair Engineering, Inc.
  3. # For more information, contact Altair at www.altair.com.
  4. #
  5. # This file is part of the PBS Professional ("PBS Pro") software.
  6. #
  7. # Open Source License Information:
  8. #
  9. # PBS Pro is free software. You can redistribute it and/or modify it under the
  10. # terms of the GNU Affero General Public License as published by the Free
  11. # Software Foundation, either version 3 of the License, or (at your option) any
  12. # later version.
  13. #
  14. # PBS Pro is distributed in the hope that it will be useful, but WITHOUT ANY
  15. # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
  16. # FOR A PARTICULAR PURPOSE.
  17. # See the GNU Affero General Public License for more details.
  18. #
  19. # You should have received a copy of the GNU Affero General Public License
  20. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  21. #
  22. # Commercial License Information:
  23. #
  24. # For a copy of the commercial license terms and conditions,
  25. # go to: (http://www.pbspro.com/UserArea/agreement.html)
  26. # or contact the Altair Legal Department.
  27. #
  28. # Altair’s dual-license business model allows companies, individuals, and
  29. # organizations to create proprietary derivative works of PBS Pro and
  30. # distribute them - whether embedded or bundled with other software -
  31. # under a commercial license agreement.
  32. #
  33. # Use of Altair’s trademarks, including but not limited to "PBS™",
  34. # "PBS Professional®", and "PBS Pro™" and Altair’s logos is subject to Altair's
  35. # trademark licensing policies.
  36. from ptl.utils.pbs_testsuite import *
  37. @tags('smoke')
  38. class SmokeTest(PBSTestSuite):
  39. """
  40. This test suite contains a few smoke tests of PBS
  41. """
  42. # Class variables
  43. resc_types = [None, 'long', 'float', 'boolean', 'size', 'string',
  44. 'string_array']
  45. resc_flags = [None, 'n', 'h', 'nh', 'q', 'f', 'fh']
  46. resc_flags_ctl = [None, 'r', 'i']
  47. objs = [QUEUE, SERVER, NODE, JOB, RESV]
  48. resc_name = "ptl_custom_res"
  49. avail_resc_name = 'resources_available.' + resc_name
  50. pu = ProcUtils()
  51. def test_submit_job(self):
  52. """
  53. Test to submit a job
  54. """
  55. j = Job(TEST_USER)
  56. jid = self.server.submit(j)
  57. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  58. @skipOnCpuSet
  59. def test_submit_job_array(self):
  60. """
  61. Test to submit a job array
  62. """
  63. a = {'resources_available.ncpus': 8}
  64. self.server.manager(MGR_CMD_SET, NODE, a, self.mom.shortname)
  65. j = Job(TEST_USER)
  66. j.set_attributes({ATTR_J: '1-3:1'})
  67. jid = self.server.submit(j)
  68. self.server.expect(JOB, {'job_state': 'B'}, jid)
  69. self.server.expect(JOB, {'job_state=R': 3}, count=True,
  70. id=jid, extend='t')
  71. @skipOnCpuSet
  72. def test_advance_reservation(self):
  73. """
  74. Test to submit an advanced reservation and submit jobs to that
  75. reservation. Check if the reservation gets confimed and the jobs
  76. inside the reservation starts running when the reservation runs.
  77. """
  78. a = {'resources_available.ncpus': 4}
  79. self.server.manager(MGR_CMD_SET, NODE, a, id=self.mom.shortname)
  80. r = Reservation(TEST_USER)
  81. now = int(time.time())
  82. a = {'Resource_List.select': '1:ncpus=4',
  83. 'reserve_start': now + 10,
  84. 'reserve_end': now + 110}
  85. r.set_attributes(a)
  86. rid = self.server.submit(r)
  87. rid_q = rid.split('.')[0]
  88. a = {'reserve_state': (MATCH_RE, "RESV_CONFIRMED|2")}
  89. self.server.expect(RESV, a, id=rid)
  90. # submit a normal job and an array job to the reservation
  91. a = {'Resource_List.select': '1:ncpus=1',
  92. ATTR_q: rid_q}
  93. j1 = Job(TEST_USER, attrs=a)
  94. jid1 = self.server.submit(j1)
  95. a = {'Resource_List.select': '1:ncpus=1',
  96. ATTR_q: rid_q, ATTR_J: '1-2'}
  97. j2 = Job(TEST_USER, attrs=a)
  98. jid2 = self.server.submit(j2)
  99. a = {'reserve_state': (MATCH_RE, "RESV_RUNNING|5")}
  100. self.server.expect(RESV, a, id=rid, interval=1)
  101. self.server.expect(JOB, {'job_state': 'R'}, jid1)
  102. self.server.expect(JOB, {'job_state': 'B'}, jid2)
  103. def test_standing_reservation(self):
  104. """
  105. Test to submit a standing reservation
  106. """
  107. # PBS_TZID environment variable must be set, there is no way to set
  108. # it through the API call, use CLI instead for this test
  109. _m = self.server.get_op_mode()
  110. if _m != PTL_CLI:
  111. self.server.set_op_mode(PTL_CLI)
  112. if 'PBS_TZID' in self.conf:
  113. tzone = self.conf['PBS_TZID']
  114. elif 'PBS_TZID' in os.environ:
  115. tzone = os.environ['PBS_TZID']
  116. else:
  117. self.logger.info('Missing timezone, using America/Los_Angeles')
  118. tzone = 'America/Los_Angeles'
  119. a = {'Resource_List.select': '1:ncpus=1',
  120. ATTR_resv_rrule: 'FREQ=WEEKLY;COUNT=3',
  121. ATTR_resv_timezone: tzone,
  122. ATTR_resv_standing: '1',
  123. 'reserve_start': time.time() + 20,
  124. 'reserve_end': time.time() + 30, }
  125. r = Reservation(TEST_USER, a)
  126. rid = self.server.submit(r)
  127. a = {'reserve_state': (MATCH_RE, "RESV_CONFIRMED|2")}
  128. self.server.expect(RESV, a, id=rid)
  129. if _m == PTL_API:
  130. self.server.set_op_mode(PTL_API)
  131. @skipOnCpuSet
  132. def test_degraded_advance_reservation(self):
  133. """
  134. Make reservations more fault tolerant
  135. Test for an advance reservation
  136. """
  137. now = int(time.time())
  138. a = {'reserve_retry_init': 5, 'reserve_retry_cutoff': 1}
  139. self.server.manager(MGR_CMD_SET, SERVER, a)
  140. a = {'resources_available.ncpus': 4}
  141. self.server.create_vnodes('vn', a, num=2, mom=self.mom)
  142. a = {'Resource_List.select': '1:ncpus=4',
  143. 'reserve_start': now + 3600,
  144. 'reserve_end': now + 7200}
  145. r = Reservation(TEST_USER, attrs=a)
  146. rid = self.server.submit(r)
  147. a = {'reserve_state': (MATCH_RE, 'RESV_CONFIRMED|2')}
  148. self.server.expect(RESV, a, id=rid)
  149. self.server.status(RESV, 'resv_nodes', id=rid)
  150. resv_node = self.server.reservations[rid].get_vnodes()[0]
  151. a = {'state': 'offline'}
  152. self.server.manager(MGR_CMD_SET, NODE, a, id=resv_node)
  153. a = {'reserve_state': (MATCH_RE, 'RESV_DEGRADED|10')}
  154. self.server.expect(RESV, a, id=rid)
  155. a = {'resources_available.ncpus': (GT, 0)}
  156. free_nodes = self.server.filter(NODE, a)
  157. nodes = free_nodes.values()[0]
  158. other_node = [nodes[0], nodes[1]][resv_node == nodes[0]]
  159. a = {'reserve_state': (MATCH_RE, 'RESV_CONFIRMED|2'),
  160. 'resv_nodes': (MATCH_RE, re.escape(other_node))}
  161. self.server.expect(RESV, a, id=rid, offset=3, attrop=PTL_AND)
  162. def test_select(self):
  163. """
  164. Test to qselect
  165. """
  166. j = Job(TEST_USER)
  167. jid = self.server.submit(j)
  168. self.server.expect(JOB, {'job_state': 'R'}, jid)
  169. jobs = self.server.select()
  170. self.assertNotEqual(jobs, None)
  171. def test_alter(self):
  172. """
  173. Test to alter job
  174. """
  175. self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'False'},
  176. expect=True)
  177. j = Job(TEST_USER)
  178. jid = self.server.submit(j)
  179. self.server.expect(JOB, {'job_state': 'Q'}, id=jid)
  180. self.server.alterjob(jid, {'comment': 'job comment altered'})
  181. self.server.expect(JOB, {'comment': 'job comment altered'}, id=jid)
  182. def test_sigjob(self):
  183. """
  184. Test to signal job
  185. """
  186. j = Job(TEST_USER)
  187. jid = self.server.submit(j)
  188. self.server.expect(JOB, {'job_state': 'R', 'substate': 42},
  189. attrop=PTL_AND, id=jid)
  190. self.server.sigjob(jid, 'suspend')
  191. self.server.expect(JOB, {'job_state': 'S'}, id=jid)
  192. self.server.sigjob(jid, 'resume')
  193. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  194. @skipOnCpuSet
  195. def test_backfilling(self):
  196. """
  197. Test for backfilling
  198. """
  199. a = {'resources_available.ncpus': 2}
  200. self.server.manager(MGR_CMD_SET, NODE, a, self.mom.shortname,
  201. expect=True)
  202. self.scheduler.set_sched_config({'strict_ordering': 'True'})
  203. a = {'Resource_List.select': '1:ncpus=1',
  204. 'Resource_List.walltime': 3600}
  205. j = Job(TEST_USER, attrs=a)
  206. jid = self.server.submit(j)
  207. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  208. a = {'Resource_List.select': '1:ncpus=2',
  209. 'Resource_List.walltime': 3600}
  210. j = Job(TEST_USER, attrs=a)
  211. jid1 = self.server.submit(j)
  212. self.server.expect(JOB, 'comment', op=SET, id=jid1)
  213. self.server.expect(JOB, {'job_state': 'Q'}, id=jid1)
  214. a = {'Resource_List.select': '1:ncpus=1',
  215. 'Resource_List.walltime': 1800}
  216. j = Job(TEST_USER, attrs=a)
  217. jid2 = self.server.submit(j)
  218. self.server.expect(JOB, {'job_state': 'R'}, id=jid2)
  219. def test_hold_release(self):
  220. """
  221. Test to hold and release a job
  222. """
  223. j = Job(TEST_USER)
  224. jid = self.server.submit(j)
  225. a = {'job_state': 'R', 'substate': '42'}
  226. self.server.expect(JOB, a, jid, attrop=PTL_AND)
  227. self.server.holdjob(jid, USER_HOLD)
  228. self.server.expect(JOB, {'Hold_Types': 'u'}, jid)
  229. self.server.rlsjob(jid, USER_HOLD)
  230. self.server.expect(JOB, {'Hold_Types': 'n'}, jid)
  231. @skipOnCpuSet
  232. def test_create_vnode(self):
  233. """
  234. Test to create vnodes
  235. """
  236. self.server.expect(SERVER, {'pbs_version': '8'}, op=GT)
  237. self.server.manager(MGR_CMD_DELETE, NODE, None, "")
  238. a = {'resources_available.ncpus': 20, 'sharing': 'force_excl'}
  239. momstr = self.mom.create_vnode_def('testnode', a, 10)
  240. self.mom.insert_vnode_def(momstr)
  241. self.server.manager(MGR_CMD_CREATE, NODE, None, self.mom.hostname)
  242. a = {'resources_available.ncpus=20': 10}
  243. self.server.expect(VNODE, a, count=True, interval=5)
  244. def test_create_execution_queue(self):
  245. """
  246. Test to create execution queue
  247. """
  248. qname = 'testq'
  249. try:
  250. self.server.manager(MGR_CMD_DELETE, QUEUE, None, qname)
  251. except:
  252. pass
  253. a = {'queue_type': 'Execution', 'enabled': 'True', 'started': 'True'}
  254. self.server.manager(MGR_CMD_CREATE, QUEUE, a, qname, expect=True)
  255. self.server.manager(MGR_CMD_DELETE, QUEUE, id=qname)
  256. def test_create_routing_queue(self):
  257. """
  258. Test to create routing queue
  259. """
  260. qname = 'routeq'
  261. try:
  262. self.server.manager(MGR_CMD_DELETE, QUEUE, None, qname)
  263. except:
  264. pass
  265. a = {'queue_type': 'Route', 'started': 'True'}
  266. self.server.manager(MGR_CMD_CREATE, QUEUE, a, qname, expect=True)
  267. self.server.manager(MGR_CMD_DELETE, QUEUE, id=qname)
  268. @skipOnCpuSet
  269. def test_fgc_limits(self):
  270. """
  271. Test for limits
  272. """
  273. a = {'resources_available.ncpus': 4}
  274. self.server.create_vnodes('lt', a, 2, self.mom)
  275. a = {'max_run': '[u:' + str(TEST_USER) + '=2]'}
  276. self.server.manager(MGR_CMD_SET, SERVER, a)
  277. self.server.expect(SERVER, a)
  278. j1 = Job(TEST_USER)
  279. j2 = Job(TEST_USER)
  280. j3 = Job(TEST_USER)
  281. j1id = self.server.submit(j1)
  282. self.server.expect(JOB, {'job_state': 'R'}, j1id)
  283. j2id = self.server.submit(j2)
  284. self.server.expect(JOB, {'job_state': 'R'}, id=j2id)
  285. j3id = self.server.submit(j3)
  286. self.server.expect(JOB, 'comment', op=SET, id=j3id)
  287. self.server.expect(JOB, {'job_state': 'Q'}, id=j3id)
  288. @skipOnCpuSet
  289. def test_limits(self):
  290. """
  291. Test for limits
  292. """
  293. a = {'resources_available.ncpus': 4}
  294. self.server.create_vnodes('lt', a, 2, self.mom)
  295. a = {'max_run_res.ncpus': '[u:' + str(TEST_USER) + '=1]'}
  296. self.server.manager(MGR_CMD_SET, SERVER, a, expect=True)
  297. for _ in range(3):
  298. j = Job(TEST_USER)
  299. self.server.submit(j)
  300. a = {'server_state': 'Scheduling'}
  301. self.server.expect(SERVER, a, op=NE)
  302. a = {'job_state=R': 1, 'euser=' + str(TEST_USER): 1}
  303. self.server.expect(JOB, a, attrop=PTL_AND)
  304. @skipOnCpuSet
  305. def test_finished_jobs(self):
  306. """
  307. Test for finished jobs
  308. """
  309. a = {'resources_available.ncpus': '4'}
  310. self.server.manager(MGR_CMD_SET, NODE, a, self.mom.shortname,
  311. expect=True)
  312. a = {'job_history_enable': 'True'}
  313. self.server.manager(MGR_CMD_SET, SERVER, a, expect=True)
  314. a = {'Resource_List.walltime': '10', ATTR_k: 'oe'}
  315. j = Job(TEST_USER, attrs=a)
  316. j.set_sleep_time(5)
  317. jid = self.server.submit(j)
  318. self.server.expect(JOB, {'job_state': 'F'}, extend='x', offset=5,
  319. interval=1, id=jid)
  320. def test_project_based_limits(self):
  321. """
  322. Test for project based limits
  323. """
  324. proj = 'testproject'
  325. a = {'max_run': '[p:' + proj + '=1]'}
  326. self.server.manager(MGR_CMD_SET, SERVER, a, expect=True)
  327. for _ in range(5):
  328. j = Job(TEST_USER, attrs={ATTR_project: proj})
  329. self.server.submit(j)
  330. self.server.expect(SERVER, {'server_state': 'Scheduling'}, op=NE)
  331. self.server.expect(JOB, {'job_state=R': 1})
  332. @skipOnCpuSet
  333. def test_job_scheduling_order(self):
  334. """
  335. Test for job scheduling order
  336. """
  337. a = {'backfill_depth': 5}
  338. self.server.manager(MGR_CMD_SET, SERVER, a, expect=True)
  339. self.scheduler.set_sched_config({'strict_ordering': 'True'})
  340. a = {'resources_available.ncpus': '1'}
  341. self.server.manager(MGR_CMD_SET, NODE, a, self.mom.shortname,
  342. expect=True)
  343. a = {'state=free': 1}
  344. self.server.expect(VNODE, a, attrop=PTL_AND)
  345. a = {'scheduling': 'False'}
  346. self.server.manager(MGR_CMD_SET, SERVER, a, expect=True)
  347. for _ in range(6):
  348. j = Job(TEST_USER, attrs={'Resource_List.select': '1:ncpus=1',
  349. 'Resource_List.walltime': 3600})
  350. self.server.submit(j)
  351. a = {'scheduling': 'True'}
  352. self.server.manager(MGR_CMD_SET, SERVER, a, expect=True)
  353. a = {'server_state': 'Scheduling'}
  354. self.server.expect(SERVER, a, op=NE)
  355. self.server.expect(JOB, {'estimated.start_time': 5},
  356. count=True, op=SET)
  357. @skipOnCpuSet
  358. def test_preemption(self):
  359. """
  360. Test for preemption
  361. """
  362. a = {'log_filter': 2048}
  363. self.scheduler.set_sched_config(a)
  364. a = {'resources_available.ncpus': '1'}
  365. self.server.manager(MGR_CMD_SET, NODE, a, self.mom.shortname,
  366. expect=True)
  367. self.server.status(QUEUE)
  368. if 'expressq' in self.server.queues.keys():
  369. self.server.manager(MGR_CMD_DELETE, QUEUE, None, 'expressq')
  370. a = {'queue_type': 'execution'}
  371. self.server.manager(MGR_CMD_CREATE, QUEUE, a, 'expressq')
  372. a = {'enabled': 'True', 'started': 'True', 'priority': 150}
  373. self.server.manager(MGR_CMD_SET, QUEUE, a, 'expressq')
  374. j = Job(TEST_USER, attrs={'Resource_List.select': '1:ncpus=1'})
  375. jid = self.server.submit(j)
  376. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  377. j2 = Job(TEST_USER,
  378. attrs={'queue': 'expressq',
  379. 'Resource_List.select': '1:ncpus=1'})
  380. j2id = self.server.submit(j2)
  381. self.server.expect(JOB, {'job_state': 'R'}, id=j2id)
  382. self.server.expect(JOB, {'job_state': 'S'}, id=jid)
  383. @skipOnCpuSet
  384. def test_preemption_qrun(self):
  385. """
  386. Test that qrun will preempt other jobs
  387. """
  388. self.server.manager(MGR_CMD_SET, NODE,
  389. {'resources_available.ncpus': 1},
  390. id=self.mom.shortname)
  391. J1 = Job(TEST_USER)
  392. jid1 = self.server.submit(J1)
  393. J2 = Job(TEST_USER)
  394. jid2 = self.server.submit(J2)
  395. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid1)
  396. self.server.expect(JOB, {ATTR_state: 'Q'}, id=jid2)
  397. self.server.runjob(jobid=jid2)
  398. self.server.expect(JOB, {ATTR_state: 'S'}, id=jid1)
  399. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid2)
  400. @skipOnCpuSet
  401. def test_fairshare(self):
  402. """
  403. Test for fairshare
  404. """
  405. a = {'fair_share': 'true ALL',
  406. 'fairshare_usage_res': 'ncpus*walltime',
  407. 'unknown_shares': 10}
  408. self.scheduler.set_sched_config(a)
  409. a = {'resources_available.ncpus': 4}
  410. self.server.create_vnodes('vnode', a, 4, self.mom)
  411. a = {'Resource_List.select': '1:ncpus=4'}
  412. for _ in range(10):
  413. j = Job(TEST_USER1, a)
  414. self.server.submit(j)
  415. a = {'job_state=R': 4}
  416. self.server.expect(JOB, a)
  417. self.logger.info('testinfo: waiting for walltime accumulation')
  418. running_jobs = self.server.filter(JOB, {'job_state': 'R'})
  419. if running_jobs.values():
  420. for _j in running_jobs.values()[0]:
  421. a = {'resources_used.walltime': (NE, '00:00:00')}
  422. self.server.expect(JOB, a, id=_j, interval=1, max_attempts=30)
  423. j = Job(TEST_USER2)
  424. jid = self.server.submit(j)
  425. self.server.expect(JOB, {'job_state': 'Q'}, id=jid, offset=5)
  426. self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'True'})
  427. a = {'server_state': 'Scheduling'}
  428. self.server.expect(SERVER, a, op=NE)
  429. self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'False'})
  430. cycle = self.scheduler.cycles(start=self.server.ctime, lastN=10)
  431. if len(cycle) > 0:
  432. i = len(cycle) - 1
  433. while len(cycle[i].political_order) == 0:
  434. i -= 1
  435. cycle = cycle[i]
  436. firstconsidered = cycle.political_order[0]
  437. lastsubmitted = jid.split('.')[0]
  438. msg = 'testinfo: first job considered [' + str(firstconsidered) + \
  439. '] == last submitted [' + str(lastsubmitted) + ']'
  440. self.logger.info(msg)
  441. self.assertEqual(firstconsidered, lastsubmitted)
  442. def test_server_hook(self):
  443. """
  444. Create a hook, import a hook content that rejects all jobs, verify
  445. that a job is rejected by the hook.
  446. """
  447. hook_name = "testhook"
  448. hook_body = "import pbs\npbs.event().reject('my custom message')\n"
  449. a = {'event': 'queuejob', 'enabled': 'True'}
  450. self.server.create_import_hook(hook_name, a, hook_body)
  451. self.server.manager(MGR_CMD_SET, SERVER, {'log_events': 2047},
  452. expect=True)
  453. j = Job(TEST_USER)
  454. try:
  455. self.server.submit(j)
  456. except PbsSubmitError:
  457. pass
  458. self.server.log_match("my custom message")
  459. def test_mom_hook(self):
  460. """
  461. Create a hook, import a hook content that rejects all jobs, verify
  462. that a job is rejected by the hook.
  463. """
  464. hook_name = "momhook"
  465. hook_body = "import pbs\npbs.event().reject('my custom message')\n"
  466. a = {'event': 'execjob_begin', 'enabled': 'True'}
  467. self.server.create_import_hook(hook_name, a, hook_body)
  468. # Asynchronous copy of hook content, we wait for the copy to occur
  469. self.server.log_match(".*successfully sent hook file.*" +
  470. hook_name + ".PY" + ".*", regexp=True,
  471. max_attempts=100, interval=5)
  472. j = Job(TEST_USER)
  473. jid = self.server.submit(j)
  474. self.mom.log_match("my custom message", starttime=self.server.ctime,
  475. interval=1)
  476. @skipOnCpuSet
  477. def test_shrink_to_fit(self):
  478. """
  479. Smoke test shrink to fit by setting a dedicated time to start in an
  480. hour and submit a job that can run for as low as 59 mn and as long as
  481. 4 hours. Expect the job's walltime to be greater or equal than the
  482. minimum set.
  483. """
  484. a = {'resources_available.ncpus': 1}
  485. self.server.manager(MGR_CMD_SET, NODE, a, self.mom.shortname)
  486. now = time.time()
  487. self.scheduler.add_dedicated_time(start=now + 3600, end=now + 7200)
  488. j = Job(TEST_USER)
  489. a = {'Resource_List.max_walltime': '04:00:00',
  490. 'Resource_List.min_walltime': '00:58:00'}
  491. j.set_attributes(a)
  492. jid = self.server.submit(j)
  493. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  494. attr = {'Resource_List.walltime':
  495. (GE, a['Resource_List.min_walltime'])}
  496. self.server.expect(JOB, attr, id=jid)
  497. def test_submit_job_with_script(self):
  498. """
  499. Test to submit job with job script
  500. """
  501. j = Job(TEST_USER, attrs={ATTR_N: 'test'})
  502. j.create_script('sleep 120\n', hostname=self.server.client)
  503. jid = self.server.submit(j)
  504. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  505. self.logger.info("Testing script with extension")
  506. j = Job(TEST_USER)
  507. fn = self.du.create_temp_file(suffix=".scr", body="/bin/sleep 10",
  508. asuser=str(TEST_USER))
  509. try:
  510. jid = self.server.submit(j, script=fn)
  511. except PbsSubmitError as e:
  512. self.assertNotIn('illegal -N value', e.msg[0],
  513. 'qsub: Not accepted "." in job name')
  514. else:
  515. self.server.expect(JOB, {'job_state': (MATCH_RE, '[RQ]')}, id=jid)
  516. self.logger.info('Job submitted successfully: ' + jid)
  517. @skipOnCpuSet
  518. def test_formula_match(self):
  519. """
  520. Test for job sort formula
  521. """
  522. a = {'resources_available.ncpus': 8}
  523. self.server.manager(MGR_CMD_SET, NODE, a, self.mom.shortname,
  524. expect=True)
  525. self.scheduler.set_sched_config({'log_filter': '2048'})
  526. a = {'job_sort_formula': 'ncpus'}
  527. self.server.manager(MGR_CMD_SET, SERVER, a, expect=True)
  528. # purposely submitting a job that is highly unlikely to run so
  529. # it stays Q'd
  530. j = Job(TEST_USER, attrs={'Resource_List.select': '1:ncpus=128'})
  531. jid = self.server.submit(j)
  532. self.server.expect(JOB, {'job_state': 'Q'}, id=jid)
  533. a = {'scheduling': 'True'}
  534. self.server.manager(MGR_CMD_SET, SERVER, a)
  535. _f1 = self.scheduler.job_formula(jid)
  536. _f2 = self.server.evaluate_formula(jid, full=False)
  537. self.assertEqual(_f1, _f2)
  538. self.logger.info(str(_f1) + " = " + str(_f2) + " ... OK")
  539. def test_staging(self):
  540. """
  541. Test for file staging
  542. """
  543. fn = self.du.create_temp_file(asuser=str(TEST_USER))
  544. a = {ATTR_stagein: fn + '2@' + self.server.hostname + ':' + fn}
  545. j = Job(TEST_USER, a)
  546. j.set_sleep_time(2)
  547. jid = self.server.submit(j)
  548. self.server.expect(JOB, 'queue', op=UNSET, id=jid, offset=2)
  549. a = {ATTR_stageout: fn + '@' + self.server.hostname + ':' + fn + '2'}
  550. j = Job(TEST_USER, a)
  551. j.set_sleep_time(2)
  552. jid = self.server.submit(j)
  553. self.server.expect(JOB, 'queue', op=UNSET, id=jid, offset=2)
  554. self.du.rm(self.server.hostname, fn, force=True, sudo=True)
  555. self.du.rm(self.server.hostname, fn + '2', force=True, sudo=True)
  556. def test_route_queue(self):
  557. """
  558. Verify that a routing queue routes a job into the appropriate execution
  559. queue.
  560. """
  561. a = {'queue_type': 'Execution', 'resources_min.ncpus': 1,
  562. 'enabled': 'True', 'started': 'False'}
  563. self.server.manager(MGR_CMD_CREATE, QUEUE, a, id='specialq')
  564. dflt_q = self.server.default_queue
  565. a = {'queue_type': 'route',
  566. 'route_destinations': dflt_q + ',specialq',
  567. 'enabled': 'True', 'started': 'True'}
  568. self.server.manager(MGR_CMD_CREATE, QUEUE, a, id='routeq')
  569. a = {'resources_min.ncpus': 4}
  570. self.server.manager(MGR_CMD_SET, QUEUE, a, id=dflt_q)
  571. j = Job(TEST_USER, attrs={ATTR_queue: 'routeq',
  572. 'Resource_List.ncpus': 1})
  573. jid = self.server.submit(j)
  574. self.server.expect(JOB, {ATTR_queue: 'specialq'}, id=jid)
  575. def test_movejob(self):
  576. """
  577. Verify that a job can be moved to another queue than the one it was
  578. originally submitted to
  579. """
  580. a = {'queue_type': 'Execution', 'enabled': 'True', 'started': 'True'}
  581. self.server.manager(MGR_CMD_CREATE, QUEUE, a, id='solverq')
  582. a = {'scheduling': 'False'}
  583. self.server.manager(MGR_CMD_SET, SERVER, a)
  584. j = Job(TEST_USER)
  585. jid = self.server.submit(j)
  586. self.server.movejob(jid, 'solverq')
  587. a = {'scheduling': 'True'}
  588. self.server.manager(MGR_CMD_SET, SERVER, a)
  589. self.server.expect(JOB, {ATTR_queue: 'solverq', 'job_state': 'R'},
  590. attrop=PTL_AND)
  591. @skipOnCpuSet
  592. def test_by_queue(self):
  593. """
  594. Test by_queue scheduling policy
  595. """
  596. a = OrderedDict()
  597. a['queue_type'] = 'execution'
  598. a['enabled'] = 'True'
  599. a['started'] = 'True'
  600. a['priority'] = 200
  601. self.server.manager(MGR_CMD_CREATE, QUEUE, a, id='p1')
  602. a['priority'] = 400
  603. self.server.manager(MGR_CMD_CREATE, QUEUE, a, id='p2')
  604. a = {'scheduling': 'False'}
  605. self.server.manager(MGR_CMD_SET, SERVER, a)
  606. self.scheduler.set_sched_config({'by_queue': 'True'})
  607. a = {'resources_available.ncpus': 8}
  608. self.server.manager(MGR_CMD_SET, NODE, a, self.mom.shortname,
  609. expect=True)
  610. a = {'Resource_List.select': '1:ncpus=1', ATTR_queue: 'p1'}
  611. j = Job(TEST_USER, a)
  612. j1id = self.server.submit(j)
  613. a = {'Resource_List.select': '1:ncpus=8', ATTR_queue: 'p1'}
  614. j = Job(TEST_USER, a)
  615. j2id = self.server.submit(j)
  616. a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'p1'}
  617. j = Job(TEST_USER, a)
  618. j3id = self.server.submit(j)
  619. a = {'Resource_List.select': '1:ncpus=1', ATTR_queue: 'p2'}
  620. j = Job(TEST_USER, a)
  621. j4id = self.server.submit(j)
  622. a = {'Resource_List.select': '1:ncpus=8', ATTR_queue: 'p2'}
  623. j = Job(TEST_USER, a)
  624. j5id = self.server.submit(j)
  625. a = {'Resource_List.select': '1:ncpus=8', ATTR_queue: 'p2'}
  626. j = Job(TEST_USER, a)
  627. j6id = self.server.submit(j)
  628. a = {'scheduling': 'True'}
  629. self.server.manager(MGR_CMD_SET, SERVER, a, expect=True)
  630. a = {'scheduling': 'False'}
  631. self.server.manager(MGR_CMD_SET, SERVER, a, expect=True)
  632. # Given node configuration of 8 cpus the only jobs that could run are
  633. # j4id j1id and j3id
  634. self.server.expect(JOB, {'job_state=R': 3})
  635. cycle = self.scheduler.cycles(start=self.server.ctime, lastN=2)
  636. if len(cycle) > 0:
  637. i = len(cycle) - 1
  638. while len(cycle[i].political_order) == 0:
  639. i -= 1
  640. cycle = cycle[i]
  641. p1jobs = [j1id, j2id, j3id]
  642. p2jobs = [j4id, j5id, j6id]
  643. jobs = [j1id, j2id, j3id, j4id, j5id, j6id]
  644. job_order = map(lambda j: j.split('.')[0], p2jobs + p1jobs)
  645. self.logger.info(
  646. 'Political order: ' + ','.join(cycle.political_order))
  647. self.logger.info('Expected order: ' + ','.join(job_order))
  648. self.assertTrue(cycle.political_order == job_order)
  649. @skipOnCpuSet
  650. def test_round_robin(self):
  651. """
  652. Test round_robin scheduling policy
  653. """
  654. a = OrderedDict()
  655. a['queue_type'] = 'execution'
  656. a['enabled'] = 'True'
  657. a['started'] = 'True'
  658. self.server.manager(MGR_CMD_CREATE, QUEUE, a, id='p1')
  659. self.server.manager(MGR_CMD_CREATE, QUEUE, a, id='p2')
  660. self.server.manager(MGR_CMD_CREATE, QUEUE, a, id='p3')
  661. a = {'scheduling': 'False'}
  662. self.server.manager(MGR_CMD_SET, SERVER, a)
  663. self.scheduler.set_sched_config({'round_robin': 'true ALL'})
  664. a = {'resources_available.ncpus': 9}
  665. self.server.manager(MGR_CMD_SET, NODE, a, self.mom.shortname,
  666. expect=True)
  667. jids = []
  668. queues = ['p1', 'p2', 'p3']
  669. queue = queues[0]
  670. for i in range(9):
  671. if (i != 0) and (i % 3 == 0):
  672. del queues[0]
  673. queue = queues[0]
  674. a = {'Resource_List.select': '1:ncpus=1', ATTR_queue: queue}
  675. j = Job(TEST_USER, a)
  676. jids.append(self.server.submit(j))
  677. start_time = int(time.time())
  678. a = {'scheduling': 'True'}
  679. self.server.manager(MGR_CMD_SET, SERVER, a)
  680. a = {'scheduling': 'False'}
  681. self.server.manager(MGR_CMD_SET, SERVER, a)
  682. self.server.expect(JOB, {'job_state=R': 9})
  683. cycle = self.scheduler.cycles(start=start_time, end=int(time.time()))
  684. if len(cycle) > 0:
  685. i = len(cycle) - 1
  686. while ((i >= 0) and (len(cycle[i].political_order) == 0)):
  687. i -= 1
  688. if i < 0:
  689. self.assertTrue(False, 'failed to found political order')
  690. cycle = cycle[i]
  691. jobs = [jids[0], jids[3], jids[6], jids[1], jids[4], jids[7],
  692. jids[2], jids[5], jids[8]]
  693. job_order = map(lambda j: j.split('.')[0], jobs)
  694. self.logger.info(
  695. 'Political order: ' + ','.join(cycle.political_order))
  696. self.logger.info('Expected order: ' + ','.join(job_order))
  697. self.assertTrue(cycle.political_order == job_order)
  698. def test_pbs_probe(self):
  699. """
  700. Verify that pbs_probe runs and returns 0 when no errors are detected
  701. """
  702. probe = os.path.join(self.server.pbs_conf['PBS_EXEC'], 'sbin',
  703. 'pbs_probe')
  704. ret = self.du.run_cmd(self.server.hostname, [probe], sudo=True)
  705. self.assertEqual(ret['rc'], 0)
  706. def test_printjob(self):
  707. """
  708. Verify that printjob can be executed
  709. """
  710. j = Job(TEST_USER)
  711. jid = self.server.submit(j)
  712. a = {'job_state': 'R', 'substate': 42}
  713. self.server.expect(JOB, a, id=jid)
  714. printjob = os.path.join(self.mom.pbs_conf['PBS_EXEC'], 'bin',
  715. 'printjob')
  716. jbfile = os.path.join(self.mom.pbs_conf['PBS_HOME'], 'mom_priv',
  717. 'jobs', jid + '.JB')
  718. ret = self.du.run_cmd(self.mom.hostname, cmd=[printjob, jbfile],
  719. sudo=True)
  720. self.assertEqual(ret['rc'], 0)
  721. def test_comm_service(self):
  722. """
  723. Examples to demonstrate how to start/stop/signal the pbs_comm service
  724. """
  725. comm = Comm()
  726. comm.isUp()
  727. comm.signal('-HUP')
  728. comm.stop()
  729. comm.start()
  730. comm.log_match('Thread')
  731. def test_add_server_dyn_res(self):
  732. """
  733. Examples to demonstrate how to add a server dynamic resource script
  734. """
  735. attr = {}
  736. attr['type'] = 'long'
  737. self.server.manager(MGR_CMD_CREATE, RSC, attr, id='foo')
  738. body = "echo 10"
  739. self.scheduler.add_server_dyn_res("foo", script_body=body)
  740. self.scheduler.add_resource("foo", apply=True)
  741. j1 = Job(TEST_USER)
  742. j1.set_attributes({'Resource_List': 'foo=5'})
  743. j1id = self.server.submit(j1)
  744. a = {'job_state': 'R', 'Resource_List.foo': '5'}
  745. self.server.expect(JOB, a, id=j1id)
  746. @skipOnCpuSet
  747. def test_schedlog_preempted_info(self):
  748. """
  749. Demonstrate how to retrieve a list of jobs that had to be preempted in
  750. order to run a high priority job
  751. """
  752. # run the preemption smoketest
  753. self.test_preemption()
  754. # Analyze the scheduler log
  755. a = PBSLogAnalyzer()
  756. a.analyze_scheduler_log(self.scheduler.logfile,
  757. start=self.server.ctime)
  758. for cycle in a.scheduler.cycles:
  759. if cycle.preempted_jobs:
  760. self.logger.info('Preemption info: ' +
  761. str(cycle.preempted_jobs))
  762. @skipOnCpuSet
  763. def test_basic(self):
  764. """
  765. basic express queue preemption test
  766. """
  767. try:
  768. self.server.manager(MGR_CMD_DELETE, QUEUE, id="expressq")
  769. except:
  770. pass
  771. a = {'queue_type': 'e',
  772. 'started': 'True',
  773. 'enabled': 'True',
  774. 'Priority': 150}
  775. self.server.manager(MGR_CMD_CREATE, QUEUE, a, "expressq")
  776. a = {'resources_available.ncpus': 4, 'resources_available.mem': '2gb'}
  777. self.server.create_vnodes('vnode', a, 4, self.mom)
  778. j1 = Job(TEST_USER)
  779. j1.set_attributes(
  780. {'Resource_List.select': '4:ncpus=4',
  781. 'Resource_List.walltime': 3600})
  782. j1id = self.server.submit(j1)
  783. self.server.expect(JOB, {'job_state': 'R', 'substate': 42}, id=j1id)
  784. j2 = Job(TEST_USER)
  785. j2.set_attributes(
  786. {'Resource_List.select': '1:ncpus=4',
  787. 'Resource_List.walltime': 3600,
  788. 'queue': 'expressq'})
  789. j2id = self.server.submit(j2)
  790. self.server.expect(JOB, {'job_state': 'S'}, id=j1id)
  791. self.server.expect(JOB, {'job_state': 'R'}, id=j2id)
  792. self.server.cleanup_jobs()
  793. self.server.expect(SERVER, {'total_jobs': 0})
  794. self.server.manager(MGR_CMD_DELETE, QUEUE, id="expressq")
  795. @skipOnCpuSet
  796. def test_basic_ja(self):
  797. """
  798. basic express queue preemption test with job array
  799. """
  800. try:
  801. self.server.manager(MGR_CMD_DELETE, QUEUE, id="expressq")
  802. except:
  803. pass
  804. a = {'queue_type': 'e',
  805. 'started': 'True',
  806. 'enabled': 'True',
  807. 'Priority': 150}
  808. self.server.manager(MGR_CMD_CREATE, QUEUE, a, "expressq")
  809. a = {'resources_available.ncpus': 4, 'resources_available.mem': '2gb'}
  810. self.server.create_vnodes('vnode', a, 4, self.mom)
  811. j1 = Job(TEST_USER)
  812. j1.set_attributes({'Resource_List.select': '4:ncpus=4',
  813. 'Resource_List.walltime': 3600})
  814. j1id = self.server.submit(j1)
  815. self.server.expect(JOB, {'job_state': 'R', 'substate': 42}, id=j1id)
  816. j2 = Job(TEST_USER)
  817. j2.set_attributes({'Resource_List.select': '1:ncpus=4',
  818. 'Resource_List.walltime': 3600,
  819. 'queue': 'expressq',
  820. ATTR_J: '1-3'})
  821. j2id = self.server.submit(j2)
  822. self.server.expect(JOB, {'job_state': 'S'}, id=j1id)
  823. self.server.expect(JOB, {'job_state=R': 3}, count=True,
  824. id=j2id, extend='t')
  825. self.server.cleanup_jobs()
  826. self.server.expect(SERVER, {'total_jobs': 0})
  827. self.server.manager(MGR_CMD_DELETE, QUEUE, id="expressq")
  828. def submit_reserv(self, resv_start, ncpus, resv_dur):
  829. a = {'Resource_List.select': '1:ncpus=%d' % ncpus,
  830. 'Resource_List.place': 'free',
  831. 'reserve_start': int(resv_start),
  832. 'reserve_duration': int(resv_dur)
  833. }
  834. r = Reservation(TEST_USER, attrs=a)
  835. rid = self.server.submit(r)
  836. try:
  837. a = {'reserve_state': (MATCH_RE, 'RESV_CONFIRMED|2')}
  838. d = self.server.expect(RESV, a, id=rid)
  839. except PtlExpectError as e:
  840. d = e.rv
  841. return d
  842. @skipOnCpuSet
  843. def test_shrink_to_fit_resv_barrier(self):
  844. """
  845. Test shrink to fit by creating one reservation having ncpus=1,
  846. starting in 3 hours with a duration of two hours. A STF job with
  847. a min_walltime of 10 min. and max_walltime of 20.5 hrs will shrink
  848. its walltime to less than or equal to 3 hours and greater than or
  849. equal to 10 mins.
  850. """
  851. a = {'resources_available.ncpus': 1}
  852. self.server.manager(MGR_CMD_SET, NODE, a, self.mom.shortname)
  853. now = time.time()
  854. resv_dur = 7200
  855. resv_start = now + 10800
  856. d = self.submit_reserv(resv_start, 1, resv_dur)
  857. self.assertTrue(d)
  858. j = Job(TEST_USER)
  859. a = {'Resource_List.ncpus': '1'}
  860. j.set_attributes(a)
  861. jid = self.server.submit(j)
  862. j2 = Job(TEST_USER)
  863. a = {'Resource_List.max_walltime': '20:30:00',
  864. 'Resource_List.min_walltime': '00:10:00'}
  865. j2.set_attributes(a)
  866. jid2 = self.server.submit(j2)
  867. self.server.expect(JOB, {'job_state': 'Q'}, id=jid)
  868. self.server.expect(JOB, {'job_state': 'R'}, id=jid2)
  869. attr = {'Resource_List.walltime': (LE, '03:00:00')}
  870. self.server.expect(JOB, attr, id=jid2)
  871. attr = {'Resource_List.walltime': (GE, '00:10:00')}
  872. self.server.expect(JOB, attr, id=jid2)
  873. @skipOnCpuSet
  874. def test_job_sort_formula_threshold(self):
  875. """
  876. Test job_sort_formula_threshold basic behavior
  877. """
  878. self.scheduler.set_sched_config({'log_filter': '2048'})
  879. a = {'resources_available.ncpus': 1}
  880. self.server.manager(MGR_CMD_SET, NODE, a, self.mom.shortname)
  881. a = {'job_sort_formula':
  882. 'ceil(fabs(-ncpus*(mem/100.00)*sqrt(walltime)))'}
  883. self.server.manager(MGR_CMD_SET, SERVER, a, expect=True)
  884. a = {'job_sort_formula_threshold': '7'}
  885. self.server.manager(MGR_CMD_SET, SCHED, a)
  886. self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'False'})
  887. a = {'Resource_List.select': '1:ncpus=1:mem=300kb',
  888. 'Resource_List.walltime': 4}
  889. J1 = Job(TEST_USER1, attrs=a)
  890. a = {'Resource_List.select': '1:ncpus=1:mem=350kb',
  891. 'Resource_List.walltime': 4}
  892. J2 = Job(TEST_USER1, attrs=a)
  893. a = {'Resource_List.select': '1:ncpus=1:mem=380kb',
  894. 'Resource_List.walltime': 4}
  895. J3 = Job(TEST_USER1, attrs=a)
  896. a = {'Resource_List.select': '1:ncpus=1:mem=440kb',
  897. 'Resource_List.walltime': 4}
  898. J4 = Job(TEST_USER1, attrs=a)
  899. j1id = self.server.submit(J1)
  900. j2id = self.server.submit(J2)
  901. j3id = self.server.submit(J3)
  902. j4id = self.server.submit(J4)
  903. self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'True'})
  904. rv = self.server.expect(SERVER, {'server_state': 'Scheduling'}, op=NE)
  905. self.logger.info("Checking the job state of " + j4id)
  906. self.server.expect(JOB, {'job_state': 'R'}, id=j4id, max_attempts=30,
  907. interval=2)
  908. self.server.expect(JOB, {'job_state': 'Q'}, id=j3id, max_attempts=30,
  909. interval=2)
  910. self.server.expect(JOB, {'job_state': 'Q'}, id=j2id, max_attempts=30,
  911. interval=2)
  912. self.server.expect(JOB, {'job_state': 'Q'}, id=j1id, max_attempts=30,
  913. interval=2)
  914. msg = "Checking the job state of %s, runs after %s is deleted" % (j3id,
  915. j4id)
  916. self.logger.info(msg)
  917. self.server.deljob(id=j4id, wait=True)
  918. self.server.expect(JOB, {'job_state': 'R'}, id=j3id, max_attempts=30,
  919. interval=2)
  920. self.server.expect(JOB, {'job_state': 'Q'}, id=j2id, max_attempts=30,
  921. interval=2)
  922. self.server.expect(JOB, {'job_state': 'Q'}, id=j1id, max_attempts=30,
  923. interval=2)
  924. self.scheduler.log_match(j1id + ";Formula Evaluation = 6",
  925. regexp=True, starttime=self.server.ctime,
  926. max_attempts=10, interval=2)
  927. m = ";Job's formula value 6 is under threshold 7"
  928. self.scheduler.log_match(j1id + m,
  929. regexp=True, starttime=self.server.ctime,
  930. max_attempts=10, interval=2)
  931. m = ";Job is under job_sort_formula threshold value"
  932. self.scheduler.log_match(j1id + m,
  933. regexp=True, starttime=self.server.ctime,
  934. max_attempts=10, interval=2)
  935. self.scheduler.log_match(j2id + ";Formula Evaluation = 7",
  936. regexp=True, starttime=self.server.ctime,
  937. max_attempts=10, interval=2)
  938. m = ";Job's formula value 7 is under threshold 7"
  939. self.scheduler.log_match(j2id + m,
  940. regexp=True, starttime=self.server.ctime,
  941. max_attempts=10, interval=2)
  942. m = ";Job is under job_sort_formula threshold value"
  943. self.scheduler.log_match(j1id + m,
  944. regexp=True, starttime=self.server.ctime,
  945. max_attempts=10, interval=2)
  946. self.scheduler.log_match(j3id + ";Formula Evaluation = 8",
  947. regexp=True, starttime=self.server.ctime,
  948. max_attempts=10, interval=2)
  949. self.scheduler.log_match(j4id + ";Formula Evaluation = 9",
  950. regexp=True, starttime=self.server.ctime,
  951. max_attempts=10, interval=2)
  952. # Make sure we can qrun a job under the threshold
  953. self.server.deljob(id=j3id, wait=True)
  954. rv = self.server.expect(SERVER, {'server_state': 'Scheduling'}, op=NE)
  955. self.server.expect(JOB, {ATTR_state: 'Q'}, id=j1id)
  956. self.server.runjob(jobid=j1id)
  957. self.server.expect(JOB, {ATTR_state: 'R'}, id=j1id)
  958. def isSuspended(self, ppid):
  959. """
  960. Check wether <ppid> is in Suspended state, return True if
  961. <ppid> in Suspended state else return False
  962. """
  963. state = 'T'
  964. rv = self.pu.get_proc_state(self.mom.shortname, ppid)
  965. if rv != state:
  966. return False
  967. childlist = self.pu.get_proc_children(self.mom.shortname,
  968. ppid)
  969. for child in childlist:
  970. rv = self.pu.get_proc_state(self.mom.shortname, child)
  971. if rv != state:
  972. return False
  973. return True
  974. def do_preempt_config(self):
  975. """
  976. Do Scheduler Preemption configuration
  977. """
  978. _t = ('\"express_queue, normal_jobs, server_softlimits,' +
  979. ' queue_softlimits\"')
  980. a = {'preempt_prio': _t}
  981. self.scheduler.set_sched_config(a)
  982. try:
  983. self.server.manager(MGR_CMD_DELETE, QUEUE, None, 'expressq')
  984. except:
  985. pass
  986. a = {'queue_type': 'e',
  987. 'started': 'True',
  988. 'Priority': 150,
  989. 'enabled': 'True'}
  990. self.server.manager(MGR_CMD_CREATE, QUEUE, a, 'expressq')
  991. def common_stuff(self, isJobArray=False, isWithPreemt=False):
  992. """
  993. Do common stuff for job like submitting, stating and suspending
  994. """
  995. if isJobArray:
  996. a = {'resources_available.ncpus': 3}
  997. else:
  998. a = {'resources_available.ncpus': 1}
  999. self.server.create_vnodes('vn', a, 1,
  1000. mom=self.mom)
  1001. if isWithPreemt:
  1002. self.do_preempt_config()
  1003. j1 = Job(TEST_USER, attrs={'Resource_List.walltime': 100})
  1004. if isJobArray:
  1005. j1.set_attributes({ATTR_J: '1-3'})
  1006. j1id = self.server.submit(j1)
  1007. if isJobArray:
  1008. a = {'job_state=R': 3, 'substate=42': 3}
  1009. else:
  1010. a = {'job_state': 'R', 'substate': 42}
  1011. self.server.expect(JOB, a, extend='t')
  1012. if isWithPreemt:
  1013. j2 = Job(TEST_USER, attrs={'Resource_List.walltime': 100,
  1014. 'queue': 'expressq'})
  1015. if isJobArray:
  1016. j2.set_attributes({ATTR_J: '1-3'})
  1017. j2id = self.server.submit(j2)
  1018. self.assertNotEqual(j2id, None)
  1019. if isJobArray:
  1020. a = {'job_state=R': 3, 'substate=42': 3}
  1021. else:
  1022. a = {'job_state': 'R', 'substate': 42}
  1023. self.server.expect(JOB, a, id=j2id, extend='t')
  1024. else:
  1025. self.server.sigjob(j1id, 'suspend')
  1026. if isJobArray:
  1027. a = {'job_state=S': 3}
  1028. else:
  1029. a = {'job_state': 'S'}
  1030. self.server.expect(JOB, a, id=j1id, extend='t')
  1031. jobs = self.server.status(JOB, id=j1id)
  1032. for job in jobs:
  1033. if 'session_id' in job:
  1034. self.server.expect(JOB, {'session_id': self.isSuspended},
  1035. id=job['id'])
  1036. if isWithPreemt:
  1037. return (j1id, j2id)
  1038. else:
  1039. return j1id
  1040. @skipOnCpuSet
  1041. def test_suspend_job_with_preempt(self):
  1042. """
  1043. Test Suspend of Job using Scheduler Preemption
  1044. """
  1045. self.common_stuff(isWithPreemt=True)
  1046. @skipOnCpuSet
  1047. def test_resume_job_with_preempt(self):
  1048. """
  1049. Test Resume of Job using Scheduler Preemption
  1050. """
  1051. (j1id, j2id) = self.common_stuff(isWithPreemt=True)
  1052. self.server.delete(j2id)
  1053. self.server.expect(JOB, {'job_state': 'R', 'substate': 42},
  1054. id=j1id)
  1055. jobs = self.server.status(JOB, id=j1id)
  1056. for job in jobs:
  1057. if 'session_id' in job:
  1058. self.server.expect(JOB,
  1059. {'session_id': (NOT, self.isSuspended)},
  1060. id=job['id'])
  1061. @skipOnCpuSet
  1062. def test_suspend_job_array_with_preempt(self):
  1063. """
  1064. Test Suspend of Job array using Scheduler Preemption
  1065. """
  1066. self.common_stuff(isJobArray=True, isWithPreemt=True)
  1067. @skipOnCpuSet
  1068. def test_resume_job_array_with_preempt(self):
  1069. """
  1070. Test Resume of Job array using Scheduler Preemption
  1071. """
  1072. (j1id, j2id) = self.common_stuff(isJobArray=True, isWithPreemt=True)
  1073. self.server.delete(j2id)
  1074. self.server.expect(JOB,
  1075. {'job_state=R': 3, 'substate=42': 3},
  1076. extend='t')
  1077. jobs = self.server.status(JOB, id=j1id, extend='t')
  1078. for job in jobs:
  1079. if 'session_id' in job:
  1080. self.server.expect(JOB,
  1081. {'session_id': (NOT, self.isSuspended)},
  1082. id=job['id'])
  1083. def create_resource_helper(self, r, t, f, c):
  1084. """
  1085. create a resource with associated type, flag, and control flag
  1086. r - The resource name
  1087. t - Type of the resource
  1088. f - Permissions/flags associated to the resource
  1089. c - Control flags
  1090. This method handles expected errors for invalid settings
  1091. """
  1092. expect_error = self.expect_error(t, f)
  1093. attr = {}
  1094. if t is not None:
  1095. attr['type'] = t
  1096. if f is not None:
  1097. attr['flag'] = f
  1098. if c:
  1099. if 'flag' in attr:
  1100. attr['flag'] += c
  1101. else:
  1102. attr['flag'] = c
  1103. if len(attr) == 0:
  1104. attr = None
  1105. try:
  1106. rc = self.server.manager(MGR_CMD_CREATE, RSC, attr, id=r,
  1107. logerr=False)
  1108. msg = None
  1109. except PbsManagerError as e:
  1110. rc = e.rc
  1111. msg = e.msg
  1112. if expect_error:
  1113. if msg:
  1114. m = 'Expected error contains "Erroneous to have"'
  1115. self.logger.info(m + ' in ' + msg[0])
  1116. self.assertTrue('Erroneous to have' in msg[0])
  1117. self.assertNotEqual(rc, 0)
  1118. return False
  1119. else:
  1120. self.assertEqual(rc, 0)
  1121. self.server.manager(MGR_CMD_LIST, RSC, id=r)
  1122. rv = self.server.resources[r].attributes['type']
  1123. if t is None:
  1124. self.assertEqual(rv, 'string')
  1125. else:
  1126. self.assertEqual(rv, t)
  1127. _f = ''
  1128. if f is not None:
  1129. _f = f
  1130. if c is not None:
  1131. _f += c
  1132. if _f:
  1133. rv = self.server.resources[r].attributes['flag']
  1134. self.assertEqual(sorted(rv), sorted(_f))
  1135. return True
  1136. def expect_error(self, t, f):
  1137. """
  1138. Returns true for invalid combinations of flag and/or type
  1139. """
  1140. if (f in ['nh', 'f', 'fh', 'n', 'q'] and
  1141. t in [None, 'string', 'string_array', 'boolean']):
  1142. return True
  1143. if (f == 'n' and t in [None, 'long', 'float', 'size']):
  1144. return True
  1145. if (f == 'f' and t in [None, 'long', 'float', 'size']):
  1146. return True
  1147. return False
  1148. def test_resource_create(self):
  1149. """
  1150. Test behavior of resource creation by permuting over all possible and
  1151. supported types and flags
  1152. """
  1153. rc = self.server.manager(MGR_CMD_CREATE, RSC, id=self.resc_name)
  1154. self.assertEqual(rc, 0)
  1155. rc = self.server.manager(MGR_CMD_LIST, RSC, id=self.resc_name)
  1156. self.assertEqual(rc, 0)
  1157. rsc = self.server.resources[self.resc_name]
  1158. self.assertEqual(rsc.attributes['type'], 'string')
  1159. self.logger.info(self.server.logprefix +
  1160. ' verify that default resource type is string...OK')
  1161. self.logger.info(self.server.logprefix +
  1162. ' verify that duplicate resource creation fails')
  1163. # check that duplicate is not allowed
  1164. try:
  1165. rc = self.server.manager(MGR_CMD_CREATE, RSC, None,
  1166. id=self.resc_name,
  1167. logerr=True)
  1168. except PbsManagerError as e:
  1169. rc = e.rc
  1170. msg = e.msg
  1171. self.assertNotEqual(rc, 0)
  1172. self.assertTrue('Duplicate entry' in msg[0])
  1173. self.logger.info('Expected error: Duplicate entry in ' + msg[0] +
  1174. ' ...OK')
  1175. self.assertNotEqual(e.rc, 0)
  1176. rc = self.server.manager(MGR_CMD_DELETE, RSC, id=self.resc_name)
  1177. self.assertEqual(rc, 0)
  1178. for t in self.resc_types:
  1179. for f in self.resc_flags:
  1180. for c in self.resc_flags_ctl:
  1181. rv = self.create_resource_helper(self.resc_name, t, f, c)
  1182. if rv:
  1183. rc = self.server.manager(MGR_CMD_DELETE, RSC,
  1184. id=self.resc_name)
  1185. self.assertEqual(rc, 0)
  1186. self.logger.info("")
  1187. def delete_resource_helper(self, r, t, f, c, obj_type, obj_id):
  1188. """
  1189. Vierify behavior upon deleting a resource that is set on a PBS object.
  1190. r - The resource to create and later on delete
  1191. t - The type of resource
  1192. f - The permissions/flags of the resource
  1193. c - The control flags of the resource
  1194. obj_type - The object type (server, queue, node, job, reservation) on
  1195. which the resource is set.
  1196. obj_id - The object identifier/name
  1197. """
  1198. ar = 'resources_available.' + r
  1199. rv = self.create_resource_helper(self.resc_name, t, f, c)
  1200. if rv:
  1201. if t in ['long', 'float', 'size', 'boolean']:
  1202. val = 0
  1203. else:
  1204. val = 'abc'
  1205. if obj_type in [JOB, RESV]:
  1206. if obj_type == JOB:
  1207. j = Job(TEST_USER1, {'Resource_List.' + r: val})
  1208. else:
  1209. j = Reservation(TEST_USER1, {'Resource_List.' + r: val})
  1210. try:
  1211. jid = self.server.submit(j)
  1212. except PbsSubmitError as e:
  1213. jid = e.rv
  1214. if c is not None and ('r' in c or 'i' in c):
  1215. self.assertEqual(jid, None)
  1216. self.logger.info('Verify that job/resv can not request '
  1217. 'invibile or read-only resource...OK')
  1218. self.server.manager(MGR_CMD_DELETE, RSC, id=r)
  1219. # done with the test case, just return
  1220. return
  1221. if obj_type == RESV:
  1222. a = {'reserve_state': (MATCH_RE, "RESV_CONFIRMED|2")}
  1223. self.server.expect(RESV, a, id=jid)
  1224. self.assertNotEqual(jid, None)
  1225. else:
  1226. self.server.manager(MGR_CMD_SET, obj_type, {ar: val},
  1227. id=obj_id, expect=True)
  1228. try:
  1229. rc = self.server.manager(MGR_CMD_DELETE, RSC, id=r,
  1230. logerr=False)
  1231. msg = None
  1232. except PbsManagerError as e:
  1233. rc = e.rc
  1234. msg = e.msg
  1235. if obj_type in [JOB, RESV]:
  1236. self.assertNotEqual(rc, 0)
  1237. if msg:
  1238. m = "Resource busy on " + PBS_OBJ_MAP[obj_type]
  1239. self.logger.info('Expecting qmgr error: ' + m + ' in ' +
  1240. msg[0])
  1241. self.assertTrue(m in msg[0])
  1242. self.server.delete(jid)
  1243. self.server.expect(obj_type, 'queue', op=UNSET)
  1244. self.server.manager(MGR_CMD_DELETE, RSC, id=r)
  1245. else:
  1246. self.assertEqual(rc, 0)
  1247. d = self.server.status(obj_type, ar, id=obj_id)
  1248. if d and len(d) > 0:
  1249. self.assertFalse(ar in d[0])
  1250. @timeout(720)
  1251. def test_resource_delete(self):
  1252. """
  1253. Verify behavior of resource deletion when the resource is defined
  1254. on a PBS object by varying over all permutations of types and flags
  1255. """
  1256. self.obj_map = {QUEUE: self.server.default_queue,
  1257. SERVER: self.server.name,
  1258. NODE: self.mom.shortname,
  1259. JOB: None, RESV: None}
  1260. try:
  1261. self.server.status(RSC, id=self.resc_name)
  1262. self.server.manager(MGR_CMD_DELETE, RSC,
  1263. id=self.resc_name, logerr=False)
  1264. except:
  1265. pass
  1266. for k in self.objs:
  1267. if k not in self.obj_map:
  1268. self.logger.error('can not map object ' + k)
  1269. continue
  1270. v = self.obj_map[k]
  1271. for t in self.resc_types:
  1272. for f in self.resc_flags:
  1273. for c in self.resc_flags_ctl:
  1274. self.delete_resource_helper(
  1275. self.resc_name, t, f, c, k, v)
  1276. self.logger.info("")
  1277. def setup_fs(self, formula):
  1278. # change resource group file and validate after all the changes are in
  1279. self.scheduler.add_to_resource_group('grp1', 100, 'root', 60,
  1280. validate=False)
  1281. self.scheduler.add_to_resource_group('grp2', 200, 'root', 40,
  1282. validate=False)
  1283. self.scheduler.add_to_resource_group('pbsuser1', 101, 'grp1', 40,
  1284. validate=False)
  1285. self.scheduler.add_to_resource_group('pbsuser2', 102, 'grp1', 20,
  1286. validate=False)
  1287. self.scheduler.add_to_resource_group('pbsuser3', 201, 'grp2', 30,
  1288. validate=False)
  1289. self.scheduler.add_to_resource_group('pbsuser4', 202, 'grp2', 10,
  1290. validate=True)
  1291. self.server.manager(MGR_CMD_SET, SERVER, {'scheduler_iteration': 7})
  1292. a = {'fair_share': 'True', 'fairshare_decay_time': '24:00:00',
  1293. 'fairshare_decay_factor': 0.5, 'fairshare_usage_res': formula,
  1294. 'log_filter': '0'}
  1295. self.scheduler.set_sched_config(a)
  1296. @skipOnCpuSet
  1297. def test_fairshare_enhanced(self):
  1298. """
  1299. Test the basic fairshare behavior with custom resources for math module
  1300. """
  1301. rv = self.server.add_resource('foo1', 'float', 'nh')
  1302. self.assertTrue(rv)
  1303. # Set scheduler fairshare usage formula
  1304. self.setup_fs('ceil(fabs(-ncpus*(foo1/100.00)*sqrt(100)))')
  1305. node_attr = {'resources_available.ncpus': 1,
  1306. 'resources_available.foo1': 5000}
  1307. self.server.manager(MGR_CMD_SET, NODE, node_attr, self.mom.shortname)
  1308. self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'False'})
  1309. job_attr = {'Resource_List.select': '1:ncpus=1:foo1=20',
  1310. 'Resource_List.walltime': 4}
  1311. J1 = Job(TEST_USER2, attrs=job_attr)
  1312. J1.set_sleep_time(4)
  1313. J2 = Job(TEST_USER3, attrs=job_attr)
  1314. J2.set_sleep_time(4)
  1315. J3 = Job(TEST_USER1, attrs=job_attr)
  1316. J3.set_sleep_time(4)
  1317. j1id = self.server.submit(J1)
  1318. j2id = self.server.submit(J2)
  1319. j3id = self.server.submit(J3)
  1320. self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'True'})
  1321. rv = self.server.expect(SERVER, {'server_state': 'Scheduling'}, op=NE)
  1322. self.logger.info("Checking the job state of " + j3id)
  1323. self.server.expect(JOB, {'job_state': 'R'}, id=j3id)
  1324. self.server.expect(JOB, {'job_state': 'Q'}, id=j2id)
  1325. self.server.expect(JOB, {'job_state': 'Q'}, id=j1id)
  1326. self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'True'})
  1327. msg = "Checking the job state of " + j2id + ", runs after "
  1328. msg += j3id + " completes"
  1329. self.logger.info(msg)
  1330. self.server.expect(JOB, {'job_state': 'R'}, id=j2id)
  1331. self.server.expect(JOB, {'job_state': 'Q'}, id=j1id)
  1332. self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'True'})
  1333. msg = "Checking the job state of " + j1id + ", runs after "
  1334. msg += j2id + " completes"
  1335. self.logger.info(msg)
  1336. self.server.expect(JOB, {'job_state': 'R'}, id=j1id)
  1337. self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'True'})
  1338. self.server.log_match(j1id + ";Exit_status")
  1339. # query fairshare and check usage
  1340. fs1 = self.scheduler.query_fairshare(name=str(TEST_USER1))
  1341. self.logger.info('Checking ' + str(fs1.usage) + " == 3")
  1342. self.assertEqual(fs1.usage, 3)
  1343. fs2 = self.scheduler.query_fairshare(name=str(TEST_USER2))
  1344. self.logger.info('Checking ' + str(fs2.usage) + " == 3")
  1345. self.assertEqual(fs2.usage, 3)
  1346. fs3 = self.scheduler.query_fairshare(name=str(TEST_USER3))
  1347. self.logger.info('Checking ' + str(fs3.usage) + " == 3")
  1348. self.assertEqual(fs3.usage, 3)
  1349. fs4 = self.scheduler.query_fairshare(name=str(TEST_USER4))
  1350. self.logger.info('Checking ' + str(fs4.usage) + " == 1")
  1351. self.assertEqual(fs4.usage, 1)
  1352. # Check the scheduler usage file whether it's updating or not
  1353. self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'False'})
  1354. J1 = Job(TEST_USER4, attrs=job_attr)
  1355. J1.set_sleep_time(4)
  1356. J2 = Job(TEST_USER2, attrs=job_attr)
  1357. J2.set_sleep_time(4)
  1358. J3 = Job(TEST_USER1, attrs=job_attr)
  1359. J3.set_sleep_time(4)
  1360. j1id = self.server.submit(J1)
  1361. j2id = self.server.submit(J2)
  1362. j3id = self.server.submit(J3)
  1363. self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'True'})
  1364. rv = self.server.expect(SERVER, {'server_state': 'Scheduling'}, op=NE)
  1365. self.logger.info("Checking the job state of " + j1id)
  1366. self.server.expect(JOB, {'job_state': 'R'}, id=j1id)
  1367. self.server.expect(JOB, {'job_state': 'Q'}, id=j2id)
  1368. self.server.expect(JOB, {'job_state': 'Q'}, id=j3id)
  1369. self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'True'})
  1370. msg = "Checking the job state of " + j3id + ", runs after "
  1371. msg += j1id + " completes"
  1372. self.logger.info(msg)
  1373. self.server.expect(JOB, {'job_state': 'R'}, id=j3id)
  1374. self.server.expect(JOB, {'job_state': 'Q'}, id=j2id)
  1375. self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'True'})
  1376. msg = "Checking the job state of " + j2id + ", runs after "
  1377. msg += j1id + " completes"
  1378. self.logger.info(msg)
  1379. self.server.expect(JOB, {'job_state': 'R'}, id=j2id)
  1380. self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'True'})
  1381. self.server.log_match(j2id + ";Exit_status")
  1382. # query fairshare and check usage
  1383. fs1 = self.scheduler.query_fairshare(name=str(TEST_USER1))
  1384. self.logger.info('Checking ' + str(fs1.usage) + " == 5")
  1385. self.assertEqual(fs1.usage, 5)
  1386. fs2 = self.scheduler.query_fairshare(name=str(TEST_USER2))
  1387. self.logger.info('Checking ' + str(fs2.usage) + " == 5")
  1388. self.assertEqual(fs2.usage, 5)
  1389. fs3 = self.scheduler.query_fairshare(name=str(TEST_USER3))
  1390. self.logger.info('Checking ' + str(fs3.usage) + " == 3")
  1391. self.assertEqual(fs3.usage, 3)
  1392. fs4 = self.scheduler.query_fairshare(name=str(TEST_USER4))
  1393. self.logger.info('Checking ' + str(fs4.usage) + " == 3")
  1394. self.assertEqual(fs4.usage, 3)
  1395. @checkModule("pexpect")
  1396. def test_interactive_job(self):
  1397. """
  1398. Submit an interactive job
  1399. """
  1400. cmd = 'sleep 10'
  1401. j = Job(TEST_USER, attrs={ATTR_inter: ''})
  1402. j.interactive_script = [('hostname', '.*'),
  1403. (cmd, '.*')]
  1404. jid = self.server.submit(j)
  1405. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  1406. self.server.delete(jid)
  1407. self.server.expect(JOB, 'queue', op=UNSET, id=jid)
  1408. def test_man_pages(self):
  1409. """
  1410. Test basic functionality of man pages
  1411. """
  1412. pbs_conf = self.du.parse_pbs_config(self.server.shortname)
  1413. manpath = os.path.join(pbs_conf['PBS_EXEC'], "share", "man")
  1414. pbs_cmnds = ["pbsnodes", "qsub"]
  1415. os.environ['MANPATH'] = manpath
  1416. for pbs_cmd in pbs_cmnds:
  1417. cmd = "man %s" % pbs_cmd
  1418. rc = self.du.run_cmd(cmd=cmd)
  1419. msg = "Error while retrieving man page of %s" % pbs_cmd
  1420. msg += "command: %s" % rc['err']
  1421. self.assertEqual(rc['rc'], 0, msg)
  1422. msg = "Successfully retrieved man page for"
  1423. msg += " %s command" % pbs_cmd
  1424. self.logger.info(msg)