do_task.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. from . import do_support, util
  2. from .io import fix_relative_symlinks
  3. import argparse
  4. import copy
  5. import importlib
  6. import inspect
  7. import json
  8. import logging
  9. import os
  10. import pprint
  11. import re
  12. import string
  13. import sys
  14. import time
  15. from shlex import quote
  16. DONE = 'done'
  17. STATUS = 'status'
  18. TIMEOUT = 30
  19. LOG = logging.getLogger()
  20. DESCRIPTION = """Given a JSON description, call a python-function.
  21. """
  22. EPILOG = """
  23. The JSON looks like this:
  24. {
  25. "inputs": {"input-name": "filename"},
  26. "outputs": {"output-name": "output-filename (relative)"},
  27. "bash_template_fn": "template.sh",
  28. "parameters": {}
  29. }
  30. This program will run on the work host, and it will do several things:
  31. - Run in CWD.
  32. - Verify that inputs are available. (Wait til timeout if not.)
  33. - Possibly, cd to tmpdir and create symlinks from inputs.
  34. - Run the python-function.
  35. - Its module must be available (e.g. in PYTHONPATH).
  36. - Pass a kwd-dict of the union of inputs/outputs/parameters.
  37. - Ignore return-value. Expect exceptions.
  38. - Possibly, mv outputs from tmpdir to workdir.
  39. - Write exit-code into STATUS.
  40. - Touch DONE on success.
  41. """
  42. """
  43. (Someday, we might also support runnable Python modules, or even executables via execvp().)
  44. Note: qsub will *not* run this directly. There is a higher layer.
  45. """
  46. def get_parser():
  47. class _Formatter(argparse.RawDescriptionHelpFormatter, argparse.ArgumentDefaultsHelpFormatter):
  48. pass
  49. parser = argparse.ArgumentParser(description=DESCRIPTION, epilog=EPILOG,
  50. formatter_class=_Formatter,
  51. )
  52. parser.add_argument('--timeout',
  53. type=int, default=TIMEOUT,
  54. help='How many seconds to wait for input files (and JSON) to exist. (default: %(default)s')
  55. parser.add_argument('--tmpdir',
  56. help='Root directory to run in. (Sub-dir name will be based on CWD.)')
  57. parser.add_argument('json_fn',
  58. help='JSON file, as per epilog.')
  59. return parser
  60. def wait_for(fn, timeout=None):
  61. if timeout is None:
  62. global TIMEOUT
  63. timeout = copy.copy(TIMEOUT) # just to be clear
  64. try:
  65. _wait_for(fn, timeout)
  66. except BaseException:
  67. LOG.exception('Was waiting for {!r}'.format(fn))
  68. raise
  69. def _wait_for(fn, timeout):
  70. LOG.debug('Checking existence of {!r} with timeout={}'.format(fn, timeout))
  71. dirname = os.path.dirname(fn)
  72. if os.path.exists(dirname):
  73. if not os.access(dirname, os.X_OK):
  74. raise Exception('Cannot x into dir {!r}'.format(dirname))
  75. while not os.path.exists(fn):
  76. if timeout > 0:
  77. time.sleep(1)
  78. timeout -= 1
  79. else:
  80. raise Exception('Timed out waiting for {!r}'.format(fn))
  81. assert os.access(fn, os.R_OK), '{!r} not readable'.format(fn)
  82. def get_func(python_function):
  83. mod_name, func_name = os.path.splitext(python_function)
  84. func_name = func_name[1:] # skip dot
  85. mod = importlib.import_module(mod_name)
  86. func = getattr(mod, func_name)
  87. return func
  88. class OldTaskRunner(object):
  89. def __init__(self, inputs, outputs, parameters):
  90. for k,v in (list(inputs.items()) + list(outputs.items())):
  91. setattr(self, k, v)
  92. self.parameters = parameters
  93. self.inputs = inputs
  94. self.outputs = outputs
  95. def run_python_func(func, inputs, outputs, parameters):
  96. if False:
  97. kwds = dict()
  98. kwds.update(inputs)
  99. kwds.update(outputs)
  100. kwds.update(parameters)
  101. func(**kwds)
  102. else:
  103. # old way, for now
  104. cwd = os.getcwd()
  105. parameters['cwd'] = cwd
  106. self = OldTaskRunner(inputs, outputs, parameters)
  107. func(self=self)
  108. script_fn = getattr(self, 'generated_script_fn', None)
  109. if script_fn is not None:
  110. do_support.run_bash(script_fn)
  111. def run_python(python_function_name, myinputs, myoutputs, parameters):
  112. func = get_func(python_function_name)
  113. try:
  114. run_python_func(func, myinputs, myoutputs, parameters)
  115. except TypeError:
  116. # Report the actual function spec.
  117. LOG.error('For function "{}", {}'.format(python_function_name, inspect.getargspec(func)))
  118. raise
  119. class Attrs(object):
  120. """This facilitates substitution of values in string.
  121. """
  122. def __str__(self):
  123. # For this, all values must be strings.
  124. return ' '.join(f for f in self.kwds.values())
  125. def __getattr__(self, name):
  126. # For this, values can be string, int, float, etc.
  127. if '*' in name:
  128. re_star = re.compile('^' + name.replace('*', '.*') + '$')
  129. result = (v for (k,v) in self.kwds.items() if re_star.search(k))
  130. elif 'ALL' == name:
  131. result = iter(self.kwds.values())
  132. else:
  133. result = [str(self.kwds[name])]
  134. return ' '.join(self.quote(v) for v in sorted(result))
  135. def __init__(self, kwds, quote=quote):
  136. self.kwds = kwds
  137. self.quote = quote
  138. def sub(bash_template, myinputs, myoutputs, parameters):
  139. # Set substitution dict
  140. var_dict = dict()
  141. valid_parameters = {k:v for k,v in parameters.items() if not k.startswith('_')}
  142. assert 'input' not in parameters
  143. assert 'output' not in parameters
  144. # input/output/params are the main values substituted in the subset of
  145. # snakemake which we support.
  146. var_dict['input'] = Attrs(myinputs)
  147. var_dict['output'] = Attrs(myoutputs)
  148. var_dict['params'] = Attrs(valid_parameters, quote=lambda x:x)
  149. fmtr = string.Formatter()
  150. return fmtr.vformat(bash_template, [], var_dict)
  151. def run_bash(bash_template, myinputs, myoutputs, parameters):
  152. # Like snakemake, we use bash "strict mode", but we add -vx.
  153. # http://redsymbol.net/articles/unofficial-bash-strict-mode/
  154. prefix = """
  155. IFS=$'\n\t'
  156. set -vxeuo pipefail
  157. hostname
  158. pwd
  159. date
  160. """
  161. # Substitute
  162. try:
  163. task_lines = sub(bash_template, myinputs, myoutputs, parameters)
  164. except Exception:
  165. msg = """\
  166. Failed to substitute var_dict
  167. inputs: {}
  168. outputs: {}
  169. parameters: {}
  170. into bash script:
  171. {}
  172. Possibly you forgot to use "input.foo" "output.bar" "params.fubar" etc. in your script?
  173. """.format(myinputs, myoutputs, parameters, bash_template)
  174. LOG.error(msg)
  175. raise
  176. postfix = """
  177. date
  178. """
  179. # Combine
  180. bash_content = prefix + task_lines + postfix
  181. # Write user_script.sh
  182. bash_fn = 'user_script.sh'
  183. with open(bash_fn, 'w') as ofs:
  184. ofs.write(bash_content)
  185. cmd = '/bin/bash {}'.format(bash_fn)
  186. util.system(cmd)
  187. def run_cfg_in_tmpdir(cfg, tmpdir, relpath):
  188. """
  189. Accept 'inputs', 'outputs', 'parameters' in cfg.
  190. Relativize 'inputs' relative to relpath, unless running in tmpdir.
  191. ('outputs' are always relative to rundir.)
  192. If 'bash_template_fn' in cfg, then substitute and use it.
  193. """
  194. inputs = cfg['inputs']
  195. outputs = cfg['outputs']
  196. parameters = cfg['parameters']
  197. bash_template_fn = cfg['bash_template_fn']
  198. for k,v in list(inputs.items()):
  199. if not os.path.isabs(v):
  200. inputs[k] = os.path.normpath(os.path.join(relpath, v))
  201. if tmpdir:
  202. inputs[k] = os.path.abspath(inputs[k])
  203. for fn in inputs.values():
  204. wait_for(fn)
  205. wait_for(bash_template_fn)
  206. bash_template = open(bash_template_fn).read()
  207. myinputs = dict(inputs)
  208. myoutputs = dict(outputs)
  209. finaloutdir = os.getcwd()
  210. if tmpdir:
  211. import getpass
  212. user = getpass.getuser()
  213. pid = os.getpid()
  214. myrundir = '{tmpdir}/{user}/pypetmp/{finaloutdir}'.format(**locals())
  215. util.rmdirs(myrundir)
  216. util.mkdirs(myrundir)
  217. # TODO(CD): Copy inputs w/ flock.
  218. else:
  219. myrundir = finaloutdir
  220. with util.cd(myrundir):
  221. if tmpdir:
  222. # Check again, in case we have the paths wrong.
  223. for fn in inputs.values():
  224. wait_for(fn, 0)
  225. # TODO(CD): Write a script in wdir even when running in tmpdir (so we can see it on error).
  226. run_bash(bash_template, myinputs, myoutputs, parameters)#create user_script.sh and run it
  227. if tmpdir:
  228. """
  229. for k,v in outputs.iteritems():
  230. cmd = 'mv -f {} {}'.format(
  231. os.path.join(myrundir, v),
  232. os.path.join(finaloutdir, v))
  233. util.system(cmd)
  234. """
  235. cmd = 'rsync -av {}/ {}; rm -rf {}'.format(myrundir, finaloutdir, myrundir)
  236. util.system(cmd)
  237. fix_relative_symlinks(finaloutdir, myrundir, recursive=True)
  238. for fn in cfg['outputs'].values():
  239. wait_for(fn)
  240. def run(json_fn, timeout, tmpdir):
  241. if isinstance(timeout, int):
  242. global TIMEOUT
  243. TIMEOUT = timeout
  244. wait_for(json_fn)
  245. LOG.debug('Loading JSON from {!r}'.format(json_fn))
  246. cfg = json.loads(open(json_fn).read())
  247. LOG.debug(pprint.pformat(cfg))
  248. rundir = os.path.normpath(os.path.dirname(json_fn))
  249. with util.cd(rundir):
  250. run_cfg_in_tmpdir(cfg, tmpdir, '.')
  251. def main():
  252. parser = get_parser()
  253. parsed_args = parser.parse_args(sys.argv[1:])
  254. try:
  255. run(**vars(parsed_args))
  256. except Exception:
  257. LOG.critical('Error in {} with args={!r}'.format(sys.argv[0], pprint.pformat(vars(parsed_args))))
  258. raise
  259. if __name__ == "__main__":
  260. do_support.setup_simple_logging(**os.environ)
  261. LOG.debug('Running "{}"'.format(' '.join(sys.argv)))
  262. main()