consensus_split.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. from future.utils import viewitems
  2. import argparse
  3. import collections
  4. import logging
  5. import os
  6. import string
  7. import sys
  8. from .. import io
  9. from .. import bash
  10. from .. import pype_tasks
  11. LOG = logging.getLogger()
  12. def corrected_relpath(p, was_rel_to):
  13. if os.path.isabs(p):
  14. return p
  15. #LOG.warning('{},{},{}'.format(p, was_rel_to, os.path.relpath(os.path.join(was_rel_to, p))))
  16. return os.path.normpath(os.path.relpath(os.path.join(was_rel_to, p)))
  17. def read_gathered_las(path):
  18. """Return dict of block->[las_paths].
  19. For now, these are ws separated on each line of input.
  20. """
  21. result = collections.defaultdict(list)
  22. dn = os.path.normpath(os.path.dirname(path))
  23. p_id2las = io.deserialize(path)
  24. for block, las_path in list(p_id2las.items()):
  25. result[int(block)].append(corrected_relpath(las_path, dn))
  26. #import pprint
  27. #LOG.warning('path={!r}, result={}'.format(
  28. # path, pprint.pformat(result)))
  29. return result
  30. def pre_hook(config_fn, db_fn):
  31. config = io.deserialize(config_fn)
  32. hook = config.get('LA4Falcon_pre')
  33. if hook:
  34. LOG.warning('Found LA4Falcon_pre in General section of cfg. About to run {!r}...'.format(hook))
  35. if config.get('LA4Falcon_preload'):
  36. LOG.error('Found both LA4Falcon_pre and LA4Falcon_preload. Why would you preload after you have copied the DB? I hope you know what you are doing.')
  37. db = os.path.abspath(db_fn)
  38. parent = os.path.abspath(os.path.dirname(os.getcwd()))
  39. dbdir = os.path.join(config['LA4Falcon_dbdir'], 'fc-db') + parent
  40. cmd = string.Template(hook).substitute(DB=db, DBDIR=dbdir)
  41. io.syscall(cmd)
  42. def run(p_id2las_fn, db_fn, length_cutoff_fn, config_fn, wildcards,
  43. bash_template_fn, split_fn):
  44. with open(bash_template_fn, 'w') as stream:
  45. stream.write(pype_tasks.TASK_CONSENSUS_TASK_SCRIPT)
  46. db_fn = os.path.realpath(db_fn)
  47. # Because DazzlerDB is not a "FileType" in pbcommand,
  48. # it might be a symlink with a weird extension.
  49. LOG.info('Scattering las from {!r} (based on {!r}) into {!r}.'.format(
  50. p_id2las_fn, db_fn, split_fn))
  51. wildcards = wildcards.split(',')
  52. #basedir = os.path.dirname(os.path.abspath(split_fn))
  53. #rootdir = os.path.dirname(os.path.dirname(basedir)) # for now
  54. outdir = os.path.abspath(os.path.dirname(split_fn))
  55. jobs = list()
  56. p_ids_merge_las = read_gathered_las(p_id2las_fn)
  57. tasks = []
  58. for (p_id, las_fns) in viewitems(p_ids_merge_las):
  59. assert len(las_fns) == 1, repr(las_fns)
  60. # since we know each merge-task is for a single block
  61. las_fn = las_fns[0]
  62. cns_id = 'cns_%05d' % int(p_id)
  63. cns_id2 = cns_id
  64. ##out_done_fn = '%s_done' % cns_label
  65. #out_file_fn = '%s.fasta' % cns_label
  66. #symlinked_las_fn = '{rootdir}/0-rawreads/cns-split/{cns_id}/merged.{cns_id2}.las'.format(**locals())
  67. symlinked_las_fn = '{outdir}/cns-symlinks/{cns_id}/merged.{cns_id2}.las'.format(**locals())
  68. io.mkdirs(os.path.normpath(os.path.dirname(symlinked_las_fn)))
  69. src = os.path.relpath(las_fn,
  70. os.path.normpath(os.path.dirname(symlinked_las_fn)))
  71. io.symlink(src, symlinked_las_fn)
  72. # Record in a job-dict.
  73. job = dict()
  74. job['input'] = dict(
  75. las = symlinked_las_fn,
  76. db = db_fn,
  77. length_cutoff = length_cutoff_fn,
  78. config = config_fn,
  79. )
  80. job['output'] = dict(
  81. fasta = 'consensus.{cns_id2}.fasta'.format(**locals()),
  82. #'{rootdir}/0-rawreads/consensus/{cns_id}/consensus.{cns_id2}.fasta'.format(**locals()),
  83. )
  84. job['params'] = dict(
  85. )
  86. job['wildcards'] = {wildcards[0]: cns_id, wildcards[1]: cns_id}
  87. jobs.append(job)
  88. io.serialize(split_fn, jobs)
  89. pre_hook(config_fn, db_fn)
  90. class HelpF(argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter):
  91. pass
  92. def parse_args(argv):
  93. description = 'Prepare for parallel consensus jobs.'
  94. epilog = ''
  95. parser = argparse.ArgumentParser(
  96. description=description,
  97. epilog=epilog,
  98. formatter_class=HelpF,
  99. )
  100. parser.add_argument(
  101. '--p-id2las-fn',
  102. help='Input. JSON dict of p-id to las.)')
  103. parser.add_argument(
  104. '--db-fn',
  105. help='Input. Dazzler DB of raw_reads.')
  106. parser.add_argument(
  107. '--length-cutoff-fn',
  108. help='Input. Contains a single integer, the length-cutoff.')
  109. parser.add_argument(
  110. '--config-fn',
  111. help='Input. JSON of relevant configuration (currently from General section of full-prog config).')
  112. parser.add_argument(
  113. '--wildcards',
  114. help='Input. Comma-separated wildcard names. Might be needed downstream.')
  115. parser.add_argument(
  116. '--split-fn',
  117. help='Output. JSON list of jobs, where each is a dict of input/output/params/wildcards.')
  118. parser.add_argument(
  119. '--bash-template-fn',
  120. help='Output. Copy of known daligner bash template, for use later.')
  121. args = parser.parse_args(argv[1:])
  122. return args
  123. def main(argv=sys.argv):
  124. args = parse_args(argv)
  125. logging.basicConfig(level=logging.INFO)
  126. run(**vars(args))
  127. if __name__ == '__main__': # pragma: no cover
  128. main()