123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197 |
- import argparse
- import logging
- import multiprocessing
- import os
- import re
- import sys
- from .. import io
- from .. import bash
- from .dazzler import symlink_db
- LOG = logging.getLogger()
- def get_option_with_proper_nproc(regexp, opt, opt_name, nproc, cpu_count=multiprocessing.cpu_count()):
- r"""Return opts sans the regexp match, and proper nproc.
- >>> regexp = re.compile(r'-j[^\d]*(\d+)')
- >>> get_option_with_proper_nproc(regexp, 'foo -j 5', 'baz', nproc=7, cpu_count=6)
- ('foo ', 5)
- >>> get_option_with_proper_nproc(regexp, 'foo -j 5', 'baz', nproc=3, cpu_count=4)
- ('foo ', 3)
- >>> get_option_with_proper_nproc(regexp, 'foo -j 5', 'baz', nproc=3, cpu_count=2)
- ('foo ', 2)
- """
- job_nproc = int(nproc)
- mo = regexp.search(opt)
- if mo:
- opt_nproc = int(mo.group(1))
- if job_nproc < opt_nproc:
- LOG.warning('NPROC={}, but falcon_sense_option="{}", so we will ignore that option and use {}'.format(
- job_nproc, opt, job_nproc))
- elif job_nproc > opt_nproc:
- LOG.warning('NPROC={}, but falcon_sense_option="{}", so we will override NPROC and use {}'.format(
- job_nproc, opt, opt_nproc))
- nproc = min(job_nproc, opt_nproc)
- opt = regexp.sub('', opt) # remove --n_core, for now
- else:
- nproc = job_nproc
- if nproc > cpu_count:
- LOG.warning('Requested nproc={} > cpu_count={}; using {}'.format(
- nproc, cpu_count, cpu_count))
- nproc = cpu_count
- return opt, nproc
- def get_falcon_sense_option(opt, nproc):
- """
- >>> get_falcon_sense_option('', 11)
- ' --n-core=11'
- >>> get_falcon_sense_option('--n-core=24', 10)
- ' --n-core=10'
- """
- re_n_core = re.compile(r'--n-core[^\d]+(\d+)')
- opt, nproc = get_option_with_proper_nproc(re_n_core, opt, 'falcon_sense_option', nproc)
- opt += ' --n-core={}'.format(nproc)
- return opt
- def get_pa_dazcon_option(opt, nproc):
- """
- >>> get_pa_dazcon_option('', 12)
- ' -j 12'
- >>> get_pa_dazcon_option('-j 48', 13)
- ' -j 13'
- """
- re_j = re.compile(r'-j[^\d]+(\d+)')
- opt, nproc = get_option_with_proper_nproc(re_j, opt, 'pa_dazcon_option', nproc)
- opt += ' -j {}'.format(nproc)
- return opt
- def symlink(actual):
- """Symlink into cwd, without relativizing.
- """
- symbolic = os.path.basename(actual)
- if os.path.abspath(actual) == os.path.abspath(symbolic):
- LOG.warning('Cannot symlink {!r} as {!r}, itself.'.format(actual, symbolic))
- return
- rel = actual # not really relative, but this code was copy/pasted
- if True:
- LOG.info('ln -sf {} {}'.format(rel, symbolic))
- if os.path.lexists(symbolic):
- if os.readlink(symbolic) == rel:
- return
- else:
- os.unlink(symbolic)
- os.symlink(rel, symbolic)
- # This function was copied from bash.py and modified.
- def script_run_consensus(config, db_fn, las_fn, out_file_fn, nproc):
- """config: dazcon, falcon_sense_greedy, falcon_sense_skip_contained, LA4Falcon_preload
- """
- symlink_db(db_fn, symlink=symlink)
- db_fn = os.path.basename(db_fn)
- assert os.path.exists(db_fn), os.path.abspath(db_fn)
- io.rm(out_file_fn) # in case of resume
- out_file_bfn = out_file_fn + '.tmp'
- params = dict(config)
- length_cutoff = params['length_cutoff']
- bash_cutoff = '{}'.format(length_cutoff)
- params['falcon_sense_option'] = get_falcon_sense_option(params.get('falcon_sense_option', ''), nproc)
- params['pa_dazcon_option'] = get_pa_dazcon_option(params.get('pa_dazcon_option', ''), nproc)
- params.update(locals()) # not needed
- LA4Falcon_flags = 'P' if params.get('LA4Falcon_preload') else ''
- if config["falcon_sense_skip_contained"]:
- LA4Falcon_flags += 'fso'
- elif config["falcon_sense_greedy"]:
- LA4Falcon_flags += 'fog'
- else:
- LA4Falcon_flags += 'fo'
- if LA4Falcon_flags:
- LA4Falcon_flags = '-' + ''.join(set(LA4Falcon_flags))
- run_consensus = "LA4Falcon -H$CUTOFF %s {db_fn} {las_fn} | python3 -m falcon_kit.mains.consensus {falcon_sense_option} >| {out_file_bfn}" % LA4Falcon_flags
- if config.get('dazcon', False):
- run_consensus = """
- which dazcon
- dazcon {pa_dazcon_option} -s {db_fn} -a {las_fn} >| {out_file_bfn}
- """
- script = """
- set -o pipefail
- CUTOFF=%(bash_cutoff)s
- %(run_consensus)s
- mv -f {out_file_bfn} {out_file_fn}
- """ % (locals())
- return script.format(**params)
- def run(config_fn, length_cutoff_fn, las_fn, db_fn, nproc,
- fasta_fn):
- job_done_fn = 'job.done'
- length_cutoff = int(open(length_cutoff_fn).read())
- config = io.deserialize(config_fn)
- config['length_cutoff'] = length_cutoff
- dbdir = config.get('LA4Falcon_dbdir')
- if dbdir:
- # Assume we are 2 levels deeper than consensus_split was.
- parent3 = os.path.abspath(os.path.dirname(os.path.dirname(os.path.dirname(os.getcwd()))))
- dbdir = os.path.join(config['LA4Falcon_dbdir'], 'fc-db') + parent3
- bn = os.path.basename(db_fn)
- LOG.warning('Using symlinks to {} in LA4Falcon_dbdir={!r}'.format(bn, dbdir))
- db_fn = os.path.join(dbdir, bn)
- script = script_run_consensus(
- config, db_fn, las_fn,
- os.path.basename(fasta_fn), # not sure basename is really needed here
- nproc=nproc,
- )
- script_fn = 'run_consensus.sh'
- bash.write_script(script, script_fn, job_done_fn)
- io.syscall('bash -vex {}'.format(script_fn))
- class HelpF(argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter):
- pass
- def parse_args(argv):
- description = 'Run consensus on a merged .las file, to produce a fasta file of preads.'
- epilog = ''
- parser = argparse.ArgumentParser(
- description=description,
- epilog=epilog,
- formatter_class=HelpF,
- )
- parser.add_argument(
- '--nproc',
- help='Number of processors to be used.')
- parser.add_argument(
- '--las-fn',
- help='Input. Merged .las file.',
- )
- parser.add_argument(
- '--db-fn',
- help='Input. Dazzler DB of raw-reads.',
- )
- parser.add_argument(
- '--length-cutoff-fn',
- help='Input. Contains a single integer, the length-cutoff.',
- )
- parser.add_argument(
- '--config-fn',
- help='Input. JSON of relevant configuration (currently from General section of full-prog config).',
- )
- parser.add_argument(
- '--fasta-fn',
- help='Output. Consensus fasta file.',
- )
- args = parser.parse_args(argv[1:])
- return args
- def main(argv=sys.argv):
- args = parse_args(argv)
- logging.basicConfig(level=logging.INFO)
- run(**vars(args))
- if __name__ == '__main__': # pragma: no cover
- main()
|