123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280 |
- from . import do_support, util
- from .io import fix_relative_symlinks
- import argparse
- import copy
- import importlib
- import inspect
- import json
- import logging
- import os
- import pprint
- import re
- import string
- import sys
- import time
- from shlex import quote
- DONE = 'done'
- STATUS = 'status'
- TIMEOUT = 30
- LOG = logging.getLogger()
- DESCRIPTION = """Given a JSON description, call a python-function.
- """
- EPILOG = """
- The JSON looks like this:
- {
- "inputs": {"input-name": "filename"},
- "outputs": {"output-name": "output-filename (relative)"},
- "bash_template_fn": "template.sh",
- "parameters": {}
- }
- This program will run on the work host, and it will do several things:
- - Run in CWD.
- - Verify that inputs are available. (Wait til timeout if not.)
- - Possibly, cd to tmpdir and create symlinks from inputs.
- - Run the python-function.
- - Its module must be available (e.g. in PYTHONPATH).
- - Pass a kwd-dict of the union of inputs/outputs/parameters.
- - Ignore return-value. Expect exceptions.
- - Possibly, mv outputs from tmpdir to workdir.
- - Write exit-code into STATUS.
- - Touch DONE on success.
- """
- """
- (Someday, we might also support runnable Python modules, or even executables via execvp().)
- Note: qsub will *not* run this directly. There is a higher layer.
- """
- def get_parser():
- class _Formatter(argparse.RawDescriptionHelpFormatter, argparse.ArgumentDefaultsHelpFormatter):
- pass
- parser = argparse.ArgumentParser(description=DESCRIPTION, epilog=EPILOG,
- formatter_class=_Formatter,
- )
- parser.add_argument('--timeout',
- type=int, default=TIMEOUT,
- help='How many seconds to wait for input files (and JSON) to exist. (default: %(default)s')
- parser.add_argument('--tmpdir',
- help='Root directory to run in. (Sub-dir name will be based on CWD.)')
- parser.add_argument('json_fn',
- help='JSON file, as per epilog.')
- return parser
- def wait_for(fn, timeout=None):
- if timeout is None:
- global TIMEOUT
- timeout = copy.copy(TIMEOUT) # just to be clear
- try:
- _wait_for(fn, timeout)
- except BaseException:
- LOG.exception('Was waiting for {!r}'.format(fn))
- raise
- def _wait_for(fn, timeout):
- LOG.debug('Checking existence of {!r} with timeout={}'.format(fn, timeout))
- dirname = os.path.dirname(fn)
- if os.path.exists(dirname):
- if not os.access(dirname, os.X_OK):
- raise Exception('Cannot x into dir {!r}'.format(dirname))
- while not os.path.exists(fn):
- if timeout > 0:
- time.sleep(1)
- timeout -= 1
- else:
- raise Exception('Timed out waiting for {!r}'.format(fn))
- assert os.access(fn, os.R_OK), '{!r} not readable'.format(fn)
- def get_func(python_function):
- mod_name, func_name = os.path.splitext(python_function)
- func_name = func_name[1:] # skip dot
- mod = importlib.import_module(mod_name)
- func = getattr(mod, func_name)
- return func
- class OldTaskRunner(object):
- def __init__(self, inputs, outputs, parameters):
- for k,v in (list(inputs.items()) + list(outputs.items())):
- setattr(self, k, v)
- self.parameters = parameters
- self.inputs = inputs
- self.outputs = outputs
- def run_python_func(func, inputs, outputs, parameters):
- if False:
- kwds = dict()
- kwds.update(inputs)
- kwds.update(outputs)
- kwds.update(parameters)
- func(**kwds)
- else:
- # old way, for now
- cwd = os.getcwd()
- parameters['cwd'] = cwd
- self = OldTaskRunner(inputs, outputs, parameters)
- func(self=self)
- script_fn = getattr(self, 'generated_script_fn', None)
- if script_fn is not None:
- do_support.run_bash(script_fn)
- def run_python(python_function_name, myinputs, myoutputs, parameters):
- func = get_func(python_function_name)
- try:
- run_python_func(func, myinputs, myoutputs, parameters)
- except TypeError:
- # Report the actual function spec.
- LOG.error('For function "{}", {}'.format(python_function_name, inspect.getargspec(func)))
- raise
- class Attrs(object):
- """This facilitates substitution of values in string.
- """
- def __str__(self):
- # For this, all values must be strings.
- return ' '.join(f for f in self.kwds.values())
- def __getattr__(self, name):
- # For this, values can be string, int, float, etc.
- if '*' in name:
- re_star = re.compile('^' + name.replace('*', '.*') + '$')
- result = (v for (k,v) in self.kwds.items() if re_star.search(k))
- elif 'ALL' == name:
- result = iter(self.kwds.values())
- else:
- result = [str(self.kwds[name])]
- return ' '.join(self.quote(v) for v in sorted(result))
- def __init__(self, kwds, quote=quote):
- self.kwds = kwds
- self.quote = quote
- def sub(bash_template, myinputs, myoutputs, parameters):
- # Set substitution dict
- var_dict = dict()
- valid_parameters = {k:v for k,v in parameters.items() if not k.startswith('_')}
- assert 'input' not in parameters
- assert 'output' not in parameters
- # input/output/params are the main values substituted in the subset of
- # snakemake which we support.
- var_dict['input'] = Attrs(myinputs)
- var_dict['output'] = Attrs(myoutputs)
- var_dict['params'] = Attrs(valid_parameters, quote=lambda x:x)
- fmtr = string.Formatter()
- return fmtr.vformat(bash_template, [], var_dict)
- def run_bash(bash_template, myinputs, myoutputs, parameters):
- # Like snakemake, we use bash "strict mode", but we add -vx.
- # http://redsymbol.net/articles/unofficial-bash-strict-mode/
- prefix = """
- IFS=$'\n\t'
- set -vxeuo pipefail
- hostname
- pwd
- date
- """
- # Substitute
- try:
- task_lines = sub(bash_template, myinputs, myoutputs, parameters)
- except Exception:
- msg = """\
- Failed to substitute var_dict
- inputs: {}
- outputs: {}
- parameters: {}
- into bash script:
- {}
- Possibly you forgot to use "input.foo" "output.bar" "params.fubar" etc. in your script?
- """.format(myinputs, myoutputs, parameters, bash_template)
- LOG.error(msg)
- raise
- postfix = """
- date
- """
- # Combine
- bash_content = prefix + task_lines + postfix
- # Write user_script.sh
- bash_fn = 'user_script.sh'
- with open(bash_fn, 'w') as ofs:
- ofs.write(bash_content)
- cmd = '/bin/bash {}'.format(bash_fn)
- util.system(cmd)
- def run_cfg_in_tmpdir(cfg, tmpdir, relpath):
- """
- Accept 'inputs', 'outputs', 'parameters' in cfg.
- Relativize 'inputs' relative to relpath, unless running in tmpdir.
- ('outputs' are always relative to rundir.)
- If 'bash_template_fn' in cfg, then substitute and use it.
- """
- inputs = cfg['inputs']
- outputs = cfg['outputs']
- parameters = cfg['parameters']
- bash_template_fn = cfg['bash_template_fn']
- for k,v in list(inputs.items()):
- if not os.path.isabs(v):
- inputs[k] = os.path.normpath(os.path.join(relpath, v))
- if tmpdir:
- inputs[k] = os.path.abspath(inputs[k])
- for fn in inputs.values():
- wait_for(fn)
- wait_for(bash_template_fn)
- bash_template = open(bash_template_fn).read()
- myinputs = dict(inputs)
- myoutputs = dict(outputs)
- finaloutdir = os.getcwd()
- if tmpdir:
- import getpass
- user = getpass.getuser()
- pid = os.getpid()
- myrundir = '{tmpdir}/{user}/pypetmp/{finaloutdir}'.format(**locals())
- util.rmdirs(myrundir)
- util.mkdirs(myrundir)
- # TODO(CD): Copy inputs w/ flock.
- else:
- myrundir = finaloutdir
- with util.cd(myrundir):
- if tmpdir:
- # Check again, in case we have the paths wrong.
- for fn in inputs.values():
- wait_for(fn, 0)
- # TODO(CD): Write a script in wdir even when running in tmpdir (so we can see it on error).
- run_bash(bash_template, myinputs, myoutputs, parameters)#create user_script.sh and run it
- if tmpdir:
- """
- for k,v in outputs.iteritems():
- cmd = 'mv -f {} {}'.format(
- os.path.join(myrundir, v),
- os.path.join(finaloutdir, v))
- util.system(cmd)
- """
- cmd = 'rsync -av {}/ {}; rm -rf {}'.format(myrundir, finaloutdir, myrundir)
- util.system(cmd)
- fix_relative_symlinks(finaloutdir, myrundir, recursive=True)
- for fn in cfg['outputs'].values():
- wait_for(fn)
- def run(json_fn, timeout, tmpdir):
- if isinstance(timeout, int):
- global TIMEOUT
- TIMEOUT = timeout
- wait_for(json_fn)
- LOG.debug('Loading JSON from {!r}'.format(json_fn))
- cfg = json.loads(open(json_fn).read())
- LOG.debug(pprint.pformat(cfg))
- rundir = os.path.normpath(os.path.dirname(json_fn))
- with util.cd(rundir):
- run_cfg_in_tmpdir(cfg, tmpdir, '.')
- def main():
- parser = get_parser()
- parsed_args = parser.parse_args(sys.argv[1:])
- try:
- run(**vars(parsed_args))
- except Exception:
- LOG.critical('Error in {} with args={!r}'.format(sys.argv[0], pprint.pformat(vars(parsed_args))))
- raise
- if __name__ == "__main__":
- do_support.setup_simple_logging(**os.environ)
- LOG.debug('Running "{}"'.format(' '.join(sys.argv)))
- main()
|