pype.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. """This was copied from falcon_unzip, but we
  2. needed to modify the TASK SCRIPT to use our copy of
  3. generic_gather.py (not used now).
  4. """
  5. import logging
  6. import os
  7. from pypeflow.simple_pwatcher_bridge import (PypeTask, Dist)
  8. from pypeflow.tasks import gen_task as pype_gen_task
  9. from pypeflow.do_task import wait_for
  10. from . import io
  11. LOG = logging.getLogger(__name__)
  12. TASK_GENERIC_RUN_UNITS_SCRIPT = """\
  13. python3 -m falcon_kit.mains.generic_run_units_of_work --nproc={params.pypeflow_nproc} --units-of-work-fn={input.units_of_work} --bash-template-fn={input.bash_template} --results-fn={output.results}
  14. """
  15. TASK_GENERIC_SCATTER_ONE_UOW_SCRIPT = """\
  16. python3 -m falcon_kit.mains.generic_scatter_one_uow --all-uow-list-fn={input.all} --one-uow-list-fn={output.one} --split-idx={params.split_idx}
  17. """
  18. TASK_GENERIC_UNSPLIT_SCRIPT = """
  19. python3 -m falcon_kit.mains.generic_unsplit --result-fn-list-fn={output.result_fn_list} --gathered-fn={output.gathered}
  20. """
  21. #TASK_GENERIC_CHUNKING_SCRIPT = """\
  22. # This is done via pbtag now, I think.
  23. #python3 -m falcon_kit.mains.generic_chunking split-fn={input.split} --bash-template-temp-fn={input.bash_template_temp} --units-of-work-fn={output.units_of_work} --uow-template-fn={output.uow_template} --split-idx={params.split_idx}
  24. #"""
  25. def wrap_gen_task(script, inputs, outputs, rule_writer=None, parameters=None, dist=None):
  26. if parameters is None:
  27. parameters = dict()
  28. if dist is None:
  29. dist = Dist()
  30. from future.utils import viewitems
  31. rel_inputs = dict()
  32. rel_outputs = dict()
  33. # Make relative to CWD. (But better if caller does this.)
  34. def get_rel(maybe_abs):
  35. rel = dict()
  36. for (k, v) in viewitems(maybe_abs):
  37. try:
  38. if os.path.isabs(v):
  39. v = os.path.relpath(v)
  40. rel[k] = v
  41. except Exception:
  42. LOG.exception('Error for {!r}->{!r}'.format(k, v))
  43. raise
  44. return rel
  45. inputs = get_rel(inputs)
  46. outputs = get_rel(outputs)
  47. first_output_dir = os.path.normpath(os.path.dirname(list(outputs.values())[0]))
  48. rel_topdir = os.path.relpath('.', first_output_dir) # redundant for rel-inputs, but fine
  49. params = dict(parameters)
  50. params['topdir'] = rel_topdir
  51. pt = pype_gen_task(script, inputs, outputs, params, dist)
  52. # Run pype_gen_task first because it can valid some stuff.
  53. if rule_writer:
  54. rule_writer(inputs, outputs, params, script)
  55. return pt
  56. def gen_parallel_tasks(
  57. wf,
  58. split_fn,
  59. gathered_fn,
  60. run_dict,
  61. rule_writer=None,
  62. dist=None,
  63. run_script=TASK_GENERIC_RUN_UNITS_SCRIPT,
  64. ):
  65. """
  66. By convention, the first (wildcard) output in run_dict['outputs'] must be the gatherable list,
  67. in the same format as the gathered_fn to be generated from them.
  68. For now, we require a single such output, since we do not yet test for wildcards.
  69. """
  70. assert 'dist' not in run_dict, 'dist should be a parameter of gen_parallel_tasks(), not of its run_dict'
  71. if dist is None:
  72. dist = Dist()
  73. from future.utils import itervalues
  74. #from future.utils import viewitems
  75. # run_dict['inputs'] should be patterns to match the inputs in split_fn, by convention.
  76. #task_parameters = resolved_dict(run_dict.get('parameters', {}))
  77. task_parameters = run_dict.get('parameters', {})
  78. assert not task_parameters, 'We do not currently support the "parameters" field of a run_dict. {!r}'.format(task_parameters)
  79. # Write 3 wildcard rules for snakemake, 2 with dynamic.
  80. if rule_writer:
  81. rule_writer.write_dynamic_rules(
  82. rule_name="foo",
  83. input_json=split_fn,
  84. inputs=dict_rel_paths(run_dict['inputs']),
  85. shell_template=run_dict['script'],
  86. parameters=task_parameters,
  87. wildcard_outputs=dict_rel_paths(run_dict['outputs']),
  88. output_json=gathered_fn,
  89. )
  90. #outputs = {k:patt.format(**jobkv) for k,patt in output_patterns}
  91. #inputs = {k:patt.format(**jobkv) for k,patt in input_patterns}
  92. #inputs['SPLIT'] = split_fn # presumably ignored by script; might not be needed at all
  93. #split_fn = scatter_dict['outputs']['split'] # by convention
  94. wf.refreshTargets()
  95. max_jobs = wf.max_jobs
  96. wait_for(split_fn)
  97. split = io.deserialize(split_fn)
  98. bash_template_fn = run_dict['bash_template_fn']
  99. def find_wildcard_input(inputs):
  100. for k,v in list(inputs.items()):
  101. if '{' in v:
  102. return v
  103. else:
  104. raise Exception('No wildcard inputs among {!r}'.format(inputs))
  105. LOG.debug('PARALLEL OUTPUTS:{}'.format(run_dict['outputs']))
  106. task_results = dict()
  107. for split_idx, job in enumerate(split):
  108. #inputs = job['input']
  109. #outputs = job['output']
  110. #params = job['params']
  111. #wildcards = job['wildcards']
  112. #params.update({k: v for (k, v) in viewitems(job['wildcards'])}) # include expanded wildcards
  113. #LOG.warning('OUT:{}'.format(outputs))
  114. wildcards = job['wildcards']
  115. def resolved(v):
  116. return v.format(**wildcards)
  117. def resolved_dict(d):
  118. result = dict(d)
  119. LOG.debug(' wildcards={!r}'.format(wildcards))
  120. for k,v in list(d.items()):
  121. LOG.debug(' k={}, v={!r}'.format(k, v))
  122. result[k] = v.format(**wildcards)
  123. return result
  124. #task_inputs = resolved_dict(run_dict['inputs'])
  125. task_outputs = resolved_dict(run_dict['outputs'])
  126. wild_input = find_wildcard_input(run_dict['inputs'])
  127. one_uow_fn = os.path.abspath(wild_input.format(**wildcards))
  128. wf.addTask(pype_gen_task(
  129. script=TASK_GENERIC_SCATTER_ONE_UOW_SCRIPT,
  130. inputs={
  131. 'all': split_fn,
  132. },
  133. outputs={
  134. 'one': one_uow_fn,
  135. },
  136. parameters={
  137. 'split_idx': split_idx,
  138. },
  139. dist=Dist(local=True, use_tmpdir=False),
  140. ))
  141. wf.addTask(pype_gen_task(
  142. script=run_script, # usually TASK_GENERIC_RUN_UNITS_SCRIPT, unless individual load-time is slow
  143. inputs={
  144. 'units_of_work': one_uow_fn,
  145. 'bash_template': bash_template_fn,
  146. },
  147. outputs=task_outputs, # TASK_GENERIC_RUN_UNITS_SCRIPT expects only 1, called 'results'
  148. parameters={}, # some are substituted from 'dist'
  149. dist=dist,
  150. ))
  151. wildcards_str = '_'.join(w for w in itervalues(job['wildcards']))
  152. job_name = 'job{}'.format(wildcards_str)
  153. task_results[job_name] = list(task_outputs.values())[0]
  154. gather_inputs = dict(task_results)
  155. ## An implicit "gatherer" simply takes the output filenames and combines their contents.
  156. gathered_dn = os.path.dirname(gathered_fn)
  157. result_fn_list_fn = os.path.join(gathered_dn, 'result-fn-list.json')
  158. # Dump (with rel-paths) into next task-dir before next task starts.
  159. io.serialize(result_fn_list_fn, [os.path.relpath(v, gathered_dn) for v in list(task_results.values())])
  160. #assert 'result_fn_list' not in gather_inputs
  161. #gather_inputs['result_fn_list'] = result_fn_list_fn # No! pseudo output, since it must exist in a known directory
  162. LOG.debug('gather_inputs:{!r}'.format(gather_inputs))
  163. wf.addTask(pype_gen_task(
  164. script=TASK_GENERIC_UNSPLIT_SCRIPT,
  165. inputs=gather_inputs,
  166. outputs={
  167. 'gathered': gathered_fn,
  168. 'result_fn_list': result_fn_list_fn,
  169. },
  170. parameters={},
  171. dist=Dist(local=True, use_tmpdir=False),
  172. ))
  173. wf.max_jobs = dist.job_dict.get('njobs', max_jobs)
  174. wf.refreshTargets()
  175. wf.max_jobs = max_jobs
  176. def dict_rel_paths(dict_paths):
  177. from future.utils import viewitems
  178. return {k: os.path.relpath(v) for (k, v) in viewitems(dict_paths)}