consensus_task.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. import argparse
  2. import logging
  3. import multiprocessing
  4. import os
  5. import re
  6. import sys
  7. from .. import io
  8. from .. import bash
  9. from .dazzler import symlink_db
  10. LOG = logging.getLogger()
  11. def get_option_with_proper_nproc(regexp, opt, opt_name, nproc, cpu_count=multiprocessing.cpu_count()):
  12. r"""Return opts sans the regexp match, and proper nproc.
  13. >>> regexp = re.compile(r'-j[^\d]*(\d+)')
  14. >>> get_option_with_proper_nproc(regexp, 'foo -j 5', 'baz', nproc=7, cpu_count=6)
  15. ('foo ', 5)
  16. >>> get_option_with_proper_nproc(regexp, 'foo -j 5', 'baz', nproc=3, cpu_count=4)
  17. ('foo ', 3)
  18. >>> get_option_with_proper_nproc(regexp, 'foo -j 5', 'baz', nproc=3, cpu_count=2)
  19. ('foo ', 2)
  20. """
  21. job_nproc = int(nproc)
  22. mo = regexp.search(opt)
  23. if mo:
  24. opt_nproc = int(mo.group(1))
  25. if job_nproc < opt_nproc:
  26. LOG.warning('NPROC={}, but falcon_sense_option="{}", so we will ignore that option and use {}'.format(
  27. job_nproc, opt, job_nproc))
  28. elif job_nproc > opt_nproc:
  29. LOG.warning('NPROC={}, but falcon_sense_option="{}", so we will override NPROC and use {}'.format(
  30. job_nproc, opt, opt_nproc))
  31. nproc = min(job_nproc, opt_nproc)
  32. opt = regexp.sub('', opt) # remove --n_core, for now
  33. else:
  34. nproc = job_nproc
  35. if nproc > cpu_count:
  36. LOG.warning('Requested nproc={} > cpu_count={}; using {}'.format(
  37. nproc, cpu_count, cpu_count))
  38. nproc = cpu_count
  39. return opt, nproc
  40. def get_falcon_sense_option(opt, nproc):
  41. """
  42. >>> get_falcon_sense_option('', 11)
  43. ' --n-core=11'
  44. >>> get_falcon_sense_option('--n-core=24', 10)
  45. ' --n-core=10'
  46. """
  47. re_n_core = re.compile(r'--n-core[^\d]+(\d+)')
  48. opt, nproc = get_option_with_proper_nproc(re_n_core, opt, 'falcon_sense_option', nproc)
  49. opt += ' --n-core={}'.format(nproc)
  50. return opt
  51. def get_pa_dazcon_option(opt, nproc):
  52. """
  53. >>> get_pa_dazcon_option('', 12)
  54. ' -j 12'
  55. >>> get_pa_dazcon_option('-j 48', 13)
  56. ' -j 13'
  57. """
  58. re_j = re.compile(r'-j[^\d]+(\d+)')
  59. opt, nproc = get_option_with_proper_nproc(re_j, opt, 'pa_dazcon_option', nproc)
  60. opt += ' -j {}'.format(nproc)
  61. return opt
  62. def symlink(actual):
  63. """Symlink into cwd, without relativizing.
  64. """
  65. symbolic = os.path.basename(actual)
  66. if os.path.abspath(actual) == os.path.abspath(symbolic):
  67. LOG.warning('Cannot symlink {!r} as {!r}, itself.'.format(actual, symbolic))
  68. return
  69. rel = actual # not really relative, but this code was copy/pasted
  70. if True:
  71. LOG.info('ln -sf {} {}'.format(rel, symbolic))
  72. if os.path.lexists(symbolic):
  73. if os.readlink(symbolic) == rel:
  74. return
  75. else:
  76. os.unlink(symbolic)
  77. os.symlink(rel, symbolic)
  78. # This function was copied from bash.py and modified.
  79. def script_run_consensus(config, db_fn, las_fn, out_file_fn, nproc):
  80. """config: dazcon, falcon_sense_greedy, falcon_sense_skip_contained, LA4Falcon_preload
  81. """
  82. symlink_db(db_fn, symlink=symlink)
  83. db_fn = os.path.basename(db_fn)
  84. assert os.path.exists(db_fn), os.path.abspath(db_fn)
  85. io.rm(out_file_fn) # in case of resume
  86. out_file_bfn = out_file_fn + '.tmp'
  87. params = dict(config)
  88. length_cutoff = params['length_cutoff']
  89. bash_cutoff = '{}'.format(length_cutoff)
  90. params['falcon_sense_option'] = get_falcon_sense_option(params.get('falcon_sense_option', ''), nproc)
  91. params['pa_dazcon_option'] = get_pa_dazcon_option(params.get('pa_dazcon_option', ''), nproc)
  92. params.update(locals()) # not needed
  93. LA4Falcon_flags = 'P' if params.get('LA4Falcon_preload') else ''
  94. if config["falcon_sense_skip_contained"]:
  95. LA4Falcon_flags += 'fso'
  96. elif config["falcon_sense_greedy"]:
  97. LA4Falcon_flags += 'fog'
  98. else:
  99. LA4Falcon_flags += 'fo'
  100. if LA4Falcon_flags:
  101. LA4Falcon_flags = '-' + ''.join(set(LA4Falcon_flags))
  102. run_consensus = "LA4Falcon -H$CUTOFF %s {db_fn} {las_fn} | python3 -m falcon_kit.mains.consensus {falcon_sense_option} >| {out_file_bfn}" % LA4Falcon_flags
  103. if config.get('dazcon', False):
  104. run_consensus = """
  105. which dazcon
  106. dazcon {pa_dazcon_option} -s {db_fn} -a {las_fn} >| {out_file_bfn}
  107. """
  108. script = """
  109. set -o pipefail
  110. CUTOFF=%(bash_cutoff)s
  111. %(run_consensus)s
  112. mv -f {out_file_bfn} {out_file_fn}
  113. """ % (locals())
  114. return script.format(**params)
  115. def run(config_fn, length_cutoff_fn, las_fn, db_fn, nproc,
  116. fasta_fn):
  117. job_done_fn = 'job.done'
  118. length_cutoff = int(open(length_cutoff_fn).read())
  119. config = io.deserialize(config_fn)
  120. config['length_cutoff'] = length_cutoff
  121. dbdir = config.get('LA4Falcon_dbdir')
  122. if dbdir:
  123. # Assume we are 2 levels deeper than consensus_split was.
  124. parent3 = os.path.abspath(os.path.dirname(os.path.dirname(os.path.dirname(os.getcwd()))))
  125. dbdir = os.path.join(config['LA4Falcon_dbdir'], 'fc-db') + parent3
  126. bn = os.path.basename(db_fn)
  127. LOG.warning('Using symlinks to {} in LA4Falcon_dbdir={!r}'.format(bn, dbdir))
  128. db_fn = os.path.join(dbdir, bn)
  129. script = script_run_consensus(
  130. config, db_fn, las_fn,
  131. os.path.basename(fasta_fn), # not sure basename is really needed here
  132. nproc=nproc,
  133. )
  134. script_fn = 'run_consensus.sh'
  135. bash.write_script(script, script_fn, job_done_fn)
  136. io.syscall('bash -vex {}'.format(script_fn))
  137. class HelpF(argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter):
  138. pass
  139. def parse_args(argv):
  140. description = 'Run consensus on a merged .las file, to produce a fasta file of preads.'
  141. epilog = ''
  142. parser = argparse.ArgumentParser(
  143. description=description,
  144. epilog=epilog,
  145. formatter_class=HelpF,
  146. )
  147. parser.add_argument(
  148. '--nproc',
  149. help='Number of processors to be used.')
  150. parser.add_argument(
  151. '--las-fn',
  152. help='Input. Merged .las file.',
  153. )
  154. parser.add_argument(
  155. '--db-fn',
  156. help='Input. Dazzler DB of raw-reads.',
  157. )
  158. parser.add_argument(
  159. '--length-cutoff-fn',
  160. help='Input. Contains a single integer, the length-cutoff.',
  161. )
  162. parser.add_argument(
  163. '--config-fn',
  164. help='Input. JSON of relevant configuration (currently from General section of full-prog config).',
  165. )
  166. parser.add_argument(
  167. '--fasta-fn',
  168. help='Output. Consensus fasta file.',
  169. )
  170. args = parser.parse_args(argv[1:])
  171. return args
  172. def main(argv=sys.argv):
  173. args = parse_args(argv)
  174. logging.basicConfig(level=logging.INFO)
  175. run(**vars(args))
  176. if __name__ == '__main__': # pragma: no cover
  177. main()