123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148 |
- from future.utils import viewitems
- import argparse
- import collections
- import logging
- import os
- import string
- import sys
- from .. import io
- from .. import bash
- from .. import pype_tasks
- LOG = logging.getLogger()
- def corrected_relpath(p, was_rel_to):
- if os.path.isabs(p):
- return p
- #LOG.warning('{},{},{}'.format(p, was_rel_to, os.path.relpath(os.path.join(was_rel_to, p))))
- return os.path.normpath(os.path.relpath(os.path.join(was_rel_to, p)))
- def read_gathered_las(path):
- """Return dict of block->[las_paths].
- For now, these are ws separated on each line of input.
- """
- result = collections.defaultdict(list)
- dn = os.path.normpath(os.path.dirname(path))
- p_id2las = io.deserialize(path)
- for block, las_path in list(p_id2las.items()):
- result[int(block)].append(corrected_relpath(las_path, dn))
- #import pprint
- #LOG.warning('path={!r}, result={}'.format(
- # path, pprint.pformat(result)))
- return result
- def pre_hook(config_fn, db_fn):
- config = io.deserialize(config_fn)
- hook = config.get('LA4Falcon_pre')
- if hook:
- LOG.warning('Found LA4Falcon_pre in General section of cfg. About to run {!r}...'.format(hook))
- if config.get('LA4Falcon_preload'):
- LOG.error('Found both LA4Falcon_pre and LA4Falcon_preload. Why would you preload after you have copied the DB? I hope you know what you are doing.')
- db = os.path.abspath(db_fn)
- parent = os.path.abspath(os.path.dirname(os.getcwd()))
- dbdir = os.path.join(config['LA4Falcon_dbdir'], 'fc-db') + parent
- cmd = string.Template(hook).substitute(DB=db, DBDIR=dbdir)
- io.syscall(cmd)
- def run(p_id2las_fn, db_fn, length_cutoff_fn, config_fn, wildcards,
- bash_template_fn, split_fn):
- with open(bash_template_fn, 'w') as stream:
- stream.write(pype_tasks.TASK_CONSENSUS_TASK_SCRIPT)
- db_fn = os.path.realpath(db_fn)
- # Because DazzlerDB is not a "FileType" in pbcommand,
- # it might be a symlink with a weird extension.
- LOG.info('Scattering las from {!r} (based on {!r}) into {!r}.'.format(
- p_id2las_fn, db_fn, split_fn))
- wildcards = wildcards.split(',')
- #basedir = os.path.dirname(os.path.abspath(split_fn))
- #rootdir = os.path.dirname(os.path.dirname(basedir)) # for now
- outdir = os.path.abspath(os.path.dirname(split_fn))
- jobs = list()
- p_ids_merge_las = read_gathered_las(p_id2las_fn)
- tasks = []
- for (p_id, las_fns) in viewitems(p_ids_merge_las):
- assert len(las_fns) == 1, repr(las_fns)
- # since we know each merge-task is for a single block
- las_fn = las_fns[0]
- cns_id = 'cns_%05d' % int(p_id)
- cns_id2 = cns_id
- ##out_done_fn = '%s_done' % cns_label
- #out_file_fn = '%s.fasta' % cns_label
- #symlinked_las_fn = '{rootdir}/0-rawreads/cns-split/{cns_id}/merged.{cns_id2}.las'.format(**locals())
- symlinked_las_fn = '{outdir}/cns-symlinks/{cns_id}/merged.{cns_id2}.las'.format(**locals())
- io.mkdirs(os.path.normpath(os.path.dirname(symlinked_las_fn)))
- src = os.path.relpath(las_fn,
- os.path.normpath(os.path.dirname(symlinked_las_fn)))
- io.symlink(src, symlinked_las_fn)
- # Record in a job-dict.
- job = dict()
- job['input'] = dict(
- las = symlinked_las_fn,
- db = db_fn,
- length_cutoff = length_cutoff_fn,
- config = config_fn,
- )
- job['output'] = dict(
- fasta = 'consensus.{cns_id2}.fasta'.format(**locals()),
- #'{rootdir}/0-rawreads/consensus/{cns_id}/consensus.{cns_id2}.fasta'.format(**locals()),
- )
- job['params'] = dict(
- )
- job['wildcards'] = {wildcards[0]: cns_id, wildcards[1]: cns_id}
- jobs.append(job)
- io.serialize(split_fn, jobs)
- pre_hook(config_fn, db_fn)
- class HelpF(argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter):
- pass
- def parse_args(argv):
- description = 'Prepare for parallel consensus jobs.'
- epilog = ''
- parser = argparse.ArgumentParser(
- description=description,
- epilog=epilog,
- formatter_class=HelpF,
- )
- parser.add_argument(
- '--p-id2las-fn',
- help='Input. JSON dict of p-id to las.)')
- parser.add_argument(
- '--db-fn',
- help='Input. Dazzler DB of raw_reads.')
- parser.add_argument(
- '--length-cutoff-fn',
- help='Input. Contains a single integer, the length-cutoff.')
- parser.add_argument(
- '--config-fn',
- help='Input. JSON of relevant configuration (currently from General section of full-prog config).')
- parser.add_argument(
- '--wildcards',
- help='Input. Comma-separated wildcard names. Might be needed downstream.')
- parser.add_argument(
- '--split-fn',
- help='Output. JSON list of jobs, where each is a dict of input/output/params/wildcards.')
- parser.add_argument(
- '--bash-template-fn',
- help='Output. Copy of known daligner bash template, for use later.')
- args = parser.parse_args(argv[1:])
- return args
- def main(argv=sys.argv):
- args = parse_args(argv)
- logging.basicConfig(level=logging.INFO)
- run(**vars(args))
- if __name__ == '__main__': # pragma: no cover
- main()
|