pbs_qsub_direct_write.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353
  1. # coding: utf-8
  2. # Copyright (C) 1994-2018 Altair Engineering, Inc.
  3. # For more information, contact Altair at www.altair.com.
  4. #
  5. # This file is part of the PBS Professional ("PBS Pro") software.
  6. #
  7. # Open Source License Information:
  8. #
  9. # PBS Pro is free software. You can redistribute it and/or modify it under the
  10. # terms of the GNU Affero General Public License as published by the Free
  11. # Software Foundation, either version 3 of the License, or (at your option) any
  12. # later version.
  13. #
  14. # PBS Pro is distributed in the hope that it will be useful, but WITHOUT ANY
  15. # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
  16. # FOR A PARTICULAR PURPOSE.
  17. # See the GNU Affero General Public License for more details.
  18. #
  19. # You should have received a copy of the GNU Affero General Public License
  20. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  21. #
  22. # Commercial License Information:
  23. #
  24. # For a copy of the commercial license terms and conditions,
  25. # go to: (http://www.pbspro.com/UserArea/agreement.html)
  26. # or contact the Altair Legal Department.
  27. #
  28. # Altair’s dual-license business model allows companies, individuals, and
  29. # organizations to create proprietary derivative works of PBS Pro and
  30. # distribute them - whether embedded or bundled with other software -
  31. # under a commercial license agreement.
  32. #
  33. # Use of Altair’s trademarks, including but not limited to "PBS™",
  34. # "PBS Professional®", and "PBS Pro™" and Altair’s logos is subject to Altair's
  35. # trademark licensing policies.
  36. from tests.functional import *
  37. class TestQsub_direct_write(TestFunctional):
  38. """
  39. validate qsub direct write option.
  40. """
  41. def setUp(self):
  42. """
  43. Default setup and variable declaration
  44. """
  45. TestFunctional.setUp(self)
  46. self.msg = "Job is sleeping for 10 secs as job should be running"
  47. self.msg += " at the time we check for directly written files"
  48. def test_direct_write_when_job_succeeds(self):
  49. """
  50. submit a sleep job and make sure that the std_files
  51. are getting directly written to the mapped directory
  52. when direct_files option is used.
  53. """
  54. j = Job(TEST_USER, attrs={ATTR_k: 'doe'})
  55. j.set_sleep_time(10)
  56. sub_dir = self.du.mkdtemp(uid=TEST_USER.uid)
  57. mapping_dir = self.du.mkdtemp(uid=TEST_USER.uid)
  58. self.mom.add_config(
  59. {'$usecp': self.server.hostname + ':' + sub_dir
  60. + ' ' + mapping_dir})
  61. self.mom.restart()
  62. jid = self.server.submit(j, submit_dir=sub_dir)
  63. self.logger.info(self.msg)
  64. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  65. file_count = len([name for name in os.listdir(
  66. mapping_dir) if os.path.isfile(os.path.join(mapping_dir, name))])
  67. self.assertEqual(2, file_count)
  68. self.server.expect(JOB, {ATTR_k: 'doe'}, id=jid)
  69. def test_direct_write_output_file(self):
  70. """
  71. submit a sleep job and make sure that the output file
  72. are getting directly written to the mapped directory
  73. when direct_files option is used with o option.
  74. """
  75. j = Job(TEST_USER, attrs={ATTR_k: 'do'})
  76. j.set_sleep_time(10)
  77. sub_dir = self.du.mkdtemp(uid=TEST_USER.uid)
  78. mapping_dir = self.du.mkdtemp(uid=TEST_USER.uid)
  79. self.mom.add_config(
  80. {'$usecp': self.server.hostname + ':' + sub_dir
  81. + ' ' + mapping_dir})
  82. self.mom.restart()
  83. jid = self.server.submit(j, submit_dir=sub_dir)
  84. self.logger.info(self.msg)
  85. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  86. for name in os.listdir(mapping_dir):
  87. p = re.search('STDIN.e*', name)
  88. if p:
  89. self.logger.info('Match found: ' + p.group())
  90. else:
  91. self.assertTrue(False)
  92. file_count = len([name for name in os.listdir(
  93. mapping_dir) if os.path.isfile(os.path.join(mapping_dir, name))])
  94. self.assertEqual(1, file_count)
  95. self.server.expect(JOB, {ATTR_k: 'do'}, id=jid)
  96. def test_direct_write_error_file(self):
  97. """
  98. submit a sleep job and make sure that the error file
  99. are getting directly written to the mapped directory
  100. when direct_files option is used with e option.
  101. """
  102. j = Job(TEST_USER, attrs={ATTR_k: 'de'})
  103. j.set_sleep_time(10)
  104. sub_dir = self.du.mkdtemp(uid=TEST_USER.uid)
  105. mapping_dir = self.du.mkdtemp(uid=TEST_USER.uid)
  106. self.mom.add_config(
  107. {'$usecp': self.server.hostname + ':' + sub_dir
  108. + ' ' + mapping_dir})
  109. self.mom.restart()
  110. jid = self.server.submit(j, submit_dir=sub_dir)
  111. self.logger.info(self.msg)
  112. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  113. for name in os.listdir(mapping_dir):
  114. p = re.search('STDIN.e*', name)
  115. if p:
  116. self.logger.info('Match found: ' + p.group())
  117. else:
  118. self.assertTrue(False)
  119. file_count = len([name for name in os.listdir(
  120. mapping_dir) if os.path.isfile(os.path.join(mapping_dir, name))])
  121. self.assertEqual(1, file_count)
  122. self.server.expect(JOB, {ATTR_k: 'de'}, id=jid)
  123. def test_direct_write_error_custom_path(self):
  124. """
  125. submit a sleep job and make sure that the files
  126. are getting directly written to the custom path
  127. provided in -e and -o option even when -doe is set.
  128. """
  129. tmp_dir = self.du.mkdtemp(uid=TEST_USER.uid)
  130. err_file = os.path.join(tmp_dir, 'error_file')
  131. out_file = os.path.join(tmp_dir, 'output_file')
  132. a = {ATTR_e: err_file, ATTR_o: out_file, ATTR_k: 'doe'}
  133. j = Job(TEST_USER, attrs=a)
  134. j.set_sleep_time(10)
  135. sub_dir = self.du.mkdtemp(uid=TEST_USER.uid)
  136. mapping_dir = self.du.mkdtemp(uid=TEST_USER.uid)
  137. self.mom.add_config(
  138. {'$usecp': self.server.hostname + ':' + sub_dir
  139. + ' ' + mapping_dir})
  140. self.mom.restart()
  141. jid = self.server.submit(j, submit_dir=sub_dir)
  142. self.logger.info(self.msg)
  143. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  144. file_count = len([name for name in os.listdir(
  145. tmp_dir) if os.path.isfile(os.path.join(tmp_dir, name))])
  146. self.assertEqual(2, file_count)
  147. self.server.expect(JOB, {ATTR_k: 'doe'}, id=jid)
  148. def test_direct_write_error_custom_dir(self):
  149. """
  150. submit a sleep job and make sure that the files
  151. are getting directly written to the custom dir
  152. provided in -e and -o option even when -doe is set.
  153. """
  154. tmp_dir = self.du.mkdtemp(uid=TEST_USER.uid)
  155. a = {ATTR_e: tmp_dir, ATTR_o: tmp_dir, ATTR_k: 'doe'}
  156. j = Job(TEST_USER, attrs=a)
  157. j.set_sleep_time(10)
  158. sub_dir = self.du.mkdtemp(uid=TEST_USER.uid)
  159. mapping_dir = self.du.mkdtemp(uid=TEST_USER.uid)
  160. self.mom.add_config(
  161. {'$usecp': self.server.hostname + ':' + sub_dir
  162. + ' ' + mapping_dir})
  163. self.mom.restart()
  164. jid = self.server.submit(j, submit_dir=sub_dir)
  165. self.logger.info(self.msg)
  166. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  167. file_count = len([name for name in os.listdir(
  168. tmp_dir) if os.path.isfile(os.path.join(tmp_dir, name))])
  169. self.assertEqual(2, file_count)
  170. self.server.expect(JOB, {ATTR_k: 'doe'}, id=jid)
  171. def test_direct_write_default_qsub_arguments(self):
  172. """
  173. submit a sleep job and make sure that the std_files
  174. are getting directly written to the mapped directory
  175. when default_qsub_arguments is set to -kdoe.
  176. """
  177. j = Job(TEST_USER)
  178. j.set_sleep_time(10)
  179. self.server.manager(MGR_CMD_SET, SERVER, {
  180. 'default_qsub_arguments': '-kdoe'})
  181. sub_dir = self.du.mkdtemp(uid=TEST_USER.uid)
  182. mapping_dir = self.du.mkdtemp(uid=TEST_USER.uid)
  183. self.mom.add_config(
  184. {'$usecp': self.server.hostname + ':' + sub_dir
  185. + ' ' + mapping_dir})
  186. self.mom.restart()
  187. jid = self.server.submit(j, submit_dir=sub_dir)
  188. self.logger.info(self.msg)
  189. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  190. file_count = len([name for name in os.listdir(
  191. mapping_dir) if os.path.isfile(os.path.join(mapping_dir, name))])
  192. self.assertEqual(2, file_count)
  193. self.server.expect(JOB, {ATTR_k: 'doe'}, id=jid)
  194. def test_direct_write_without_config_entry(self):
  195. """
  196. submit a sleep job and make sure that the std_files
  197. is directly written to the submission directory when it is
  198. accessible from mom and direct_files option is used
  199. but submission directory is not mapped in mom config file.
  200. """
  201. j = Job(TEST_USER, attrs={ATTR_k: 'doe'})
  202. j.set_sleep_time(10)
  203. sub_dir = self.du.mkdtemp(uid=TEST_USER.uid)
  204. jid = self.server.submit(j, submit_dir=sub_dir)
  205. self.logger.info(self.msg)
  206. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  207. file_count = len([name for name in os.listdir(
  208. sub_dir) if os.path.isfile(os.path.join(sub_dir, name))])
  209. self.assertEqual(2, file_count)
  210. def test_qalter_direct_write(self):
  211. """
  212. submit a job and make sure that it in queued state.
  213. alter the job with -koed and check whether it is
  214. reflecting in qstat -f output.
  215. """
  216. mydate = int(time.time()) + 60
  217. j = Job(TEST_USER)
  218. attribs = {
  219. ATTR_a: time.strftime(
  220. '%m%d%H%M',
  221. time.localtime(
  222. float(mydate)))}
  223. j.set_attributes(attribs)
  224. jid = self.server.submit(j)
  225. attribs = {ATTR_k: 'oed'}
  226. try:
  227. self.server.alterjob(jid, attribs)
  228. if self.server.expect(JOB, {'job_state': 'W'},
  229. id=jid):
  230. self.server.expect(JOB, attribs,
  231. id=jid)
  232. except PbsAlterError as e:
  233. print str(e)
  234. def test_qalter_direct_write_error(self):
  235. """
  236. submit a job and after it starts running alter
  237. the job with -koed and check whether expected
  238. error message appears
  239. """
  240. j = Job(TEST_USER)
  241. jid = self.server.submit(j)
  242. attribs = {ATTR_k: 'oed'}
  243. self.server.expect(JOB, {'job_state': 'R'})
  244. try:
  245. self.server.alterjob(jid, attribs)
  246. except PbsAlterError as e:
  247. self.assertTrue(
  248. 'Cannot modify attribute while job running Keep_Files'
  249. in e.msg[0])
  250. def test_direct_write_qrerun(self):
  251. """
  252. submit a sleep job and make sure that the std_files
  253. are written and when a job is rerun error message
  254. in logged in mom_log that it is skipping directly
  255. written/absent spool file as files are already
  256. present on first run of the job.
  257. """
  258. self.mom.add_config({'$logevent': '0xffffffff'})
  259. j = Job(TEST_USER, attrs={ATTR_k: 'doe'})
  260. j.set_sleep_time(10)
  261. sub_dir = self.du.mkdtemp(uid=TEST_USER.uid)
  262. mapping_dir = self.du.mkdtemp(uid=TEST_USER.uid)
  263. self.mom.add_config(
  264. {'$usecp': self.server.hostname + ':' + sub_dir
  265. + ' ' + mapping_dir})
  266. self.mom.restart()
  267. jid = self.server.submit(j, submit_dir=sub_dir)
  268. self.logger.info(self.msg)
  269. self.server.expect(JOB, {ATTR_k: 'doe'}, id=jid)
  270. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  271. self.server.rerunjob(jid)
  272. self.mom.log_match(
  273. "stage_file;Skipping directly written/absent spool file",
  274. max_attempts=10, interval=5)
  275. file_count = len([name for name in os.listdir(
  276. mapping_dir) if os.path.isfile(os.path.join(mapping_dir, name))])
  277. self.assertEqual(2, file_count)
  278. def test_direct_write_job_array(self):
  279. """
  280. submit a job array and make sure that the std_files
  281. is directly written to the submission directory when it is
  282. accessible from mom and direct_files option is used
  283. but submission directory is not mapped in mom config file.
  284. """
  285. a = {'resources_available.ncpus': 4}
  286. self.server.manager(MGR_CMD_SET, NODE, a, self.mom.shortname)
  287. j = Job(TEST_USER, attrs={ATTR_k: 'doe', ATTR_J: '1-4'})
  288. j.set_sleep_time(10)
  289. sub_dir = self.du.mkdtemp(uid=TEST_USER.uid)
  290. jid = self.server.submit(j, submit_dir=sub_dir)
  291. self.server.expect(JOB, {ATTR_state: 'B'}, id=jid)
  292. self.server.expect(JOB, {ATTR_state + '=R': 4}, count=True,
  293. id=jid, extend='t')
  294. self.logger.info('checking for directly written std files')
  295. file_list = [name for name in os.listdir(
  296. sub_dir) if os.path.isfile(os.path.join(sub_dir, name))]
  297. self.assertEqual(8, len(file_list))
  298. idn = jid[:jid.find('[]')]
  299. for std in ['o', 'e']:
  300. for sub_ind in range(1, 5):
  301. f_name = 'STDIN.' + std + idn + '.' + str(sub_ind)
  302. if f_name not in file_list:
  303. raise self.failureException("std file " + f_name
  304. + " not found")
  305. def test_direct_write_job_array_custom_dir(self):
  306. """
  307. submit a job array and make sure that the files
  308. are getting directly written to the custom dir
  309. provided in -e and -o option even when -doe is set.
  310. """
  311. a = {'resources_available.ncpus': 4}
  312. self.server.manager(MGR_CMD_SET, NODE, a, self.mom.shortname)
  313. tmp_dir = self.du.mkdtemp(uid=TEST_USER.uid)
  314. a = {ATTR_e: tmp_dir, ATTR_o: tmp_dir, ATTR_k: 'doe', ATTR_J: '1-4'}
  315. j = Job(TEST_USER, attrs=a)
  316. j.set_sleep_time(10)
  317. sub_dir = self.du.mkdtemp(uid=TEST_USER.uid)
  318. mapping_dir = self.du.mkdtemp(uid=TEST_USER.uid)
  319. self.mom.add_config(
  320. {'$usecp': self.server.hostname + ':' + sub_dir
  321. + ' ' + mapping_dir})
  322. self.mom.restart()
  323. jid = self.server.submit(j, submit_dir=sub_dir)
  324. self.server.expect(JOB, {ATTR_state: 'B'}, id=jid)
  325. self.server.expect(JOB, {ATTR_state + '=R': 4}, count=True,
  326. id=jid, extend='t')
  327. self.logger.info('checking for directly written std files')
  328. file_list = [name for name in os.listdir(
  329. tmp_dir) if os.path.isfile(os.path.join(tmp_dir, name))]
  330. self.assertEqual(8, len(file_list))
  331. for ext in ['.OU', '.ER']:
  332. for sub_ind in range(1, 5):
  333. f_name = j.create_subjob_id(jid, sub_ind) + ext
  334. if f_name not in file_list:
  335. raise self.failureException("std file " + f_name
  336. + " not found")