bam2dexta.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. import argparse
  2. import collections
  3. import glob
  4. import logging
  5. import os
  6. import re
  7. import sys
  8. import time
  9. from .. import io, functional
  10. from .. import(
  11. bash, # for write_sub_script
  12. pype_tasks, # for TASKS
  13. )
  14. LOG = logging.getLogger()
  15. WAIT = 20 # seconds
  16. def bam2dexta_split(bam_subreadset_fn, wildcards, split_fn, bash_template_fn):
  17. assert bam_subreadset_fn.endswith('.xml')
  18. with open(bash_template_fn, 'w') as stream:
  19. stream.write(pype_tasks.TASK_BAM2DEXTA_APPLY_SCRIPT)
  20. split_dataset_prefix = os.path.join(os.getcwd(), 'split') # TODO: Test this as relative sub-dir.
  21. #from ..util import dataset_split # introduces pbcore dependency
  22. #bam_paths = dataset_split.split_dataset(bam_subreadset_fn, split_dataset_prefix)
  23. bam_paths = [bam_subreadset_fn] # Lose parallelism, but avoid pbcore.
  24. jobs = list()
  25. for i, bam_fn in enumerate(bam_paths):
  26. job_id = 'b_{:03d}'.format(i)
  27. # Write the las files for this job.
  28. #input_dir = os.path.join('bam2dexta-scripts', job_id)
  29. #bam_paths_fn = os.path.join('.', input_dir, 'bam-paths.json')
  30. #io.mkdirs(input_dir)
  31. #io.serialize(bam_paths_fn, bam_paths)
  32. # Record in a job-dict.
  33. dexta_fn = 'subreads.{}.dexta'.format(job_id)
  34. job = dict()
  35. job['input'] = dict(
  36. bam=bam_fn,
  37. )
  38. job['output'] = dict(
  39. dexta=dexta_fn
  40. )
  41. job['params'] = dict(
  42. )
  43. job['wildcards'] = {wildcards: job_id}
  44. jobs.append(job)
  45. io.serialize(split_fn, jobs)
  46. def bam2dexta_apply(bam_fn, dexta_fn):
  47. """Given a bam subread DataSet, write a .dexta file.
  48. """
  49. io.rm_force(dexta_fn)
  50. tmpdir = '.' # There is no significant improvement to runnning on local disk.
  51. cmd = 'rm -f {dexta_fn}; ls -larth {tmpdir}; (bam2fasta -u -o - {bam_fn} | dexta -i >| {tmpdir}/foo.dexta); mv -f {tmpdir}/foo.dexta {dexta_fn}'.format(
  52. **locals())
  53. # Note: If 'dexta' fails, the script will error. So we might still have an empty foo.dexta, but
  54. # we will not have moved it to {dexta_fn}.
  55. io.syscall(cmd)
  56. def bam2dexta_combine(gathered_fn, dexta_fofn_fn):
  57. gathered = io.deserialize(gathered_fn)
  58. d = os.path.abspath(os.path.realpath(os.path.dirname(gathered_fn)))
  59. def abspath(fn):
  60. if os.path.isabs(fn):
  61. return fn # I expect this never to happen though.
  62. return os.path.join(d, fn)
  63. dexta_fns = list()
  64. for job_output in gathered:
  65. assert len(job_output) == 1, 'len(job_output) == {} != 1'.format(len(job_output))
  66. for fn in list(job_output.values()):
  67. abs_fn = abspath(fn)
  68. dexta_fns.append(abs_fn)
  69. dexta_paths = list()
  70. for dexta_fn in sorted(dexta_fns):
  71. if not os.path.exists(dexta_fn):
  72. msg = 'Did not find {!r}. Waiting {} seconds.'.format(dexta_fn, WAIT)
  73. LOG.info(msg)
  74. time.sleep(WAIT)
  75. if not os.path.exists(dexta_fn):
  76. msg = 'Did not find {!r}, even after waiting {} seconds. Maybe retry later?'.format(dexta_fn, WAIT)
  77. raise Exception(msg)
  78. dexta_paths.append(dexta_fn)
  79. # Serialize result.
  80. #io.serialize(dexta_paths_fn, sorted(dexta_paths))
  81. with open(dexta_fofn_fn, 'w') as stream:
  82. stream.write('\n'.join(dexta_paths))
  83. stream.write('\n')
  84. def setup_logging(log_level):
  85. hdlr = logging.StreamHandler(sys.stderr)
  86. hdlr.setLevel(log_level)
  87. hdlr.setFormatter(logging.Formatter('[%(levelname)s]%(message)s'))
  88. LOG.addHandler(hdlr)
  89. LOG.setLevel(logging.NOTSET)
  90. LOG.info('Log-level: {}'.format(log_level))
  91. def cmd_split(args):
  92. bam2dexta_split(
  93. args.bam_subreadset_fn,
  94. args.wildcards,
  95. args.split_fn, args.bash_template_fn,
  96. )
  97. def cmd_apply(args):
  98. bam2dexta_apply(args.bam_fn, args.dexta_fn)
  99. def cmd_combine(args):
  100. bam2dexta_combine(args.gathered_fn, args.dexta_fofn_fn)
  101. #def get_ours(config_fn):
  102. # ours = dict()
  103. # config = io.deserialize(config_fn)
  104. # LOG.info('config({!r}):\n{}'.format(config_fn, config))
  105. # LOG.info('our subset of config:\n{}'.format(ours))
  106. # return ours
  107. def add_split_arguments(parser):
  108. parser.add_argument(
  109. '--wildcards', default='bam2dexta0_id',
  110. help='Comma-separated string of keys to be subtituted into output paths for each job, if any. (Helps with snakemake and pypeflow; not needed in pbsmrtpipe, since outputs are pre-determined.)',
  111. )
  112. parser.add_argument(
  113. '--bam-subreadset-fn',
  114. help='input. Dataset (.xml) of bam files of subreads.'
  115. )
  116. parser.add_argument(
  117. '--split-fn', default='bam2dexta-uows.json',
  118. help='output. Units-of-work for bam2fasta/dexta.',
  119. )
  120. parser.add_argument(
  121. '--bash-template-fn', default='bash-template.sh',
  122. help='output. Script to apply later.',
  123. )
  124. def add_apply_arguments(parser):
  125. parser.add_argument(
  126. '--bam-fn', required=True,
  127. help='input. bam or dataset')
  128. parser.add_argument(
  129. '--dexta-fn', required=True,
  130. help='output. The dazzler (Gene Myers) dexta-file.',
  131. )
  132. def add_combine_arguments(parser):
  133. parser.add_argument(
  134. '--gathered-fn', required=True,
  135. help='input. List of sentinels. Produced by gen_parallel_tasks() gathering. The .las files are next to these.',
  136. )
  137. parser.add_argument(
  138. '--dexta-fofn-fn', required=True,
  139. help='output. FOFN of dexta paths.')
  140. class HelpF(argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter):
  141. pass
  142. def parse_args(argv):
  143. description = 'Efficiently generate .dexta from BAM or subread datasets.'
  144. epilog = 'For more details on .dexta, see https://dazzlerblog.wordpress.com/command-guides/dextractor-command-guide/'
  145. parser = argparse.ArgumentParser(
  146. description=description,
  147. epilog=epilog,
  148. formatter_class=HelpF,
  149. )
  150. parser.add_argument(
  151. '--log-level', default='INFO',
  152. help='Python logging level.',
  153. )
  154. parser.add_argument(
  155. '--nproc', type=int, default=0,
  156. help='ignored for now, but non-zero will mean "No more than this."',
  157. )
  158. help_split = 'get each bam-file (or subread dataset file)'
  159. help_apply = 'run bam2fasta and dexta as a unit-of-work'
  160. help_combine = 'generate a file of .dexta files'
  161. subparsers = parser.add_subparsers(help='sub-command help')
  162. parser_split = subparsers.add_parser('split',
  163. formatter_class=HelpF,
  164. description=help_split,
  165. epilog='',
  166. help=help_split)
  167. add_split_arguments(parser_split)
  168. parser_split.set_defaults(func=cmd_split)
  169. parser_apply = subparsers.add_parser('apply',
  170. formatter_class=HelpF,
  171. description=help_apply,
  172. epilog='',
  173. help=help_apply)
  174. add_apply_arguments(parser_apply)
  175. parser_apply.set_defaults(func=cmd_apply)
  176. parser_combine = subparsers.add_parser('combine',
  177. formatter_class=HelpF,
  178. description=help_combine,
  179. epilog='',
  180. help=help_combine)
  181. add_combine_arguments(parser_combine)
  182. parser_combine.set_defaults(func=cmd_combine)
  183. args = parser.parse_args(argv[1:])
  184. return args
  185. def main(argv=sys.argv):
  186. args = parse_args(argv)
  187. setup_logging(args.log_level)
  188. args.func(args)
  189. if __name__ == '__main__': # pragma: no cover
  190. main()