tasks.py 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. import collections
  2. import logging
  3. import os
  4. import pprint
  5. from .simple_pwatcher_bridge import (PypeTask, Dist)
  6. from . import io
  7. LOG = logging.getLogger(__name__)
  8. def task_generic_bash_script(self):
  9. """Generic script task.
  10. The script template should be in
  11. self.bash_template
  12. The template will be substituted by
  13. the content of "self" and of "self.parameters".
  14. (That is a little messy, but good enough for now.)
  15. """
  16. self_dict = dict()
  17. self_dict.update(self.__dict__)
  18. self_dict.update(self.parameters)
  19. script_unsub = self.bash_template
  20. script = script_unsub.format(**self_dict)
  21. script_fn = 'script.sh'
  22. with open(script_fn, 'w') as ofs:
  23. ofs.write(script)
  24. self.generated_script_fn = script_fn
  25. def gen_task(script, inputs, outputs, parameters=None, dist=None):
  26. """
  27. dist is used in two ways:
  28. 1) in the pwatcher, to control job-distribution
  29. 2) as additional parameters:
  30. - params.pypeflow_nproc
  31. - params.pypeflow_mb
  32. """
  33. if parameters is None:
  34. parameters = dict()
  35. if dist is None:
  36. dist = Dist()
  37. LOG.debug('gen_task({}\n\tinputs={!r},\n\toutputs={!r})'.format(
  38. script, inputs, outputs))
  39. parameters = dict(parameters) # copy
  40. parameters['pypeflow_nproc'] = dist.pypeflow_nproc
  41. parameters['pypeflow_mb'] = dist.pypeflow_mb
  42. LOG.debug(' parameters={}'.format(
  43. pprint.pformat(parameters)))
  44. LOG.debug(' dist.job_dict={}'.format(
  45. pprint.pformat(dist.job_dict)))
  46. def validate_dict(mydict):
  47. "Python identifiers are illegal as keys."
  48. try:
  49. collections.namedtuple('validate', list(mydict.keys()))
  50. except ValueError as exc:
  51. LOG.exception('Bad key name in task definition dict {!r}'.format(mydict))
  52. raise
  53. validate_dict(inputs)
  54. validate_dict(outputs)
  55. validate_dict(parameters)
  56. make_task = PypeTask(
  57. inputs={k: v for k,v in inputs.items()},
  58. outputs={k: v for k,v in outputs.items()},
  59. parameters=parameters,
  60. bash_template=script,
  61. dist=dist,
  62. )
  63. return make_task(task_generic_bash_script)