1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889 |
- """
- This must not run in a tmpdir. The 'inputs' paths will
- end up relative to the rundir.
- """
- import argparse
- import collections
- import glob
- import logging
- import os
- import sys
- from .. import io
- LOG = logging.getLogger()
- # Here is some stuff basically copied from pypeflow.sample_tasks.py.
- def validate(bash_template, inputs, outputs, parameterss):
- LOG.info('bash_script_from_template({}\n\tinputs={!r},\n\toutputs={!r})'.format(
- bash_template, inputs, outputs))
- def validate_dict(mydict):
- "Python identifiers are illegal as keys."
- try:
- collections.namedtuple('validate', list(mydict.keys()))
- except ValueError as exc:
- LOG.exception('Bad key name in task definition dict {!r}'.format(mydict))
- raise
- validate_dict(inputs)
- validate_dict(outputs)
- validate_dict(parameterss)
- def run(all_uow_list_fn, split_idx, one_uow_list_fn):
- all_uows = io.deserialize(all_uow_list_fn)
- all_dn = os.path.abspath(os.path.dirname(all_uow_list_fn))
- one_dn = os.path.abspath(os.path.dirname(one_uow_list_fn))
- rel_dn = os.path.relpath(all_dn, one_dn)
- one_uow = all_uows[split_idx]
- def fixpath(rel):
- try:
- if not os.path.isabs(rel):
- return os.path.join('.', os.path.normpath(os.path.join(rel_dn, rel)))
- except Exception:
- # in case of non-string?
- pass
- return rel
- if isinstance(one_uow, dict):
- input_dict = one_uow['input']
- for k, v in list(input_dict.items()):
- input_dict[k] = fixpath(v)
- io.serialize(one_uow_list_fn, [one_uow])
- class HelpF(argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter):
- pass
- def parse_args(argv):
- description = 'Scatter a single unit-of-work from many units-of-work.'
- epilog = ''
- parser = argparse.ArgumentParser(
- description=description,
- epilog=epilog,
- formatter_class=HelpF,
- )
- parser.add_argument(
- '--all-uow-list-fn',
- help='Input. JSON list of all units of work.')
- parser.add_argument(
- '--split-idx', type=int,
- help='Input. Index into the all-uow-list for our single unit-of-work.')
- parser.add_argument(
- '--one-uow-list-fn',
- help='Output. JSON list of a single unit-of-work.')
- args = parser.parse_args(argv[1:])
- return args
- def main(argv=sys.argv):
- args = parse_args(argv)
- logging.basicConfig(level=logging.INFO)
- run(**vars(args))
- if __name__ == '__main__': # pragma: no cover
- main()
|