123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219 |
- import argparse
- import collections
- import glob
- import logging
- import os
- import re
- import sys
- import time
- from .. import io, functional
- from .. import(
- bash, # for write_sub_script
- pype_tasks, # for TASKS
- )
- LOG = logging.getLogger()
- WAIT = 20 # seconds
- def bam2dexta_split(bam_subreadset_fn, wildcards, split_fn, bash_template_fn):
- assert bam_subreadset_fn.endswith('.xml')
- with open(bash_template_fn, 'w') as stream:
- stream.write(pype_tasks.TASK_BAM2DEXTA_APPLY_SCRIPT)
- split_dataset_prefix = os.path.join(os.getcwd(), 'split') # TODO: Test this as relative sub-dir.
- #from ..util import dataset_split # introduces pbcore dependency
- #bam_paths = dataset_split.split_dataset(bam_subreadset_fn, split_dataset_prefix)
- bam_paths = [bam_subreadset_fn] # Lose parallelism, but avoid pbcore.
- jobs = list()
- for i, bam_fn in enumerate(bam_paths):
- job_id = 'b_{:03d}'.format(i)
- # Write the las files for this job.
- #input_dir = os.path.join('bam2dexta-scripts', job_id)
- #bam_paths_fn = os.path.join('.', input_dir, 'bam-paths.json')
- #io.mkdirs(input_dir)
- #io.serialize(bam_paths_fn, bam_paths)
- # Record in a job-dict.
- dexta_fn = 'subreads.{}.dexta'.format(job_id)
- job = dict()
- job['input'] = dict(
- bam=bam_fn,
- )
- job['output'] = dict(
- dexta=dexta_fn
- )
- job['params'] = dict(
- )
- job['wildcards'] = {wildcards: job_id}
- jobs.append(job)
- io.serialize(split_fn, jobs)
- def bam2dexta_apply(bam_fn, dexta_fn):
- """Given a bam subread DataSet, write a .dexta file.
- """
- io.rm_force(dexta_fn)
- tmpdir = '.' # There is no significant improvement to runnning on local disk.
- cmd = 'rm -f {dexta_fn}; ls -larth {tmpdir}; (bam2fasta -u -o - {bam_fn} | dexta -i >| {tmpdir}/foo.dexta); mv -f {tmpdir}/foo.dexta {dexta_fn}'.format(
- **locals())
- # Note: If 'dexta' fails, the script will error. So we might still have an empty foo.dexta, but
- # we will not have moved it to {dexta_fn}.
- io.syscall(cmd)
- def bam2dexta_combine(gathered_fn, dexta_fofn_fn):
- gathered = io.deserialize(gathered_fn)
- d = os.path.abspath(os.path.realpath(os.path.dirname(gathered_fn)))
- def abspath(fn):
- if os.path.isabs(fn):
- return fn # I expect this never to happen though.
- return os.path.join(d, fn)
- dexta_fns = list()
- for job_output in gathered:
- assert len(job_output) == 1, 'len(job_output) == {} != 1'.format(len(job_output))
- for fn in list(job_output.values()):
- abs_fn = abspath(fn)
- dexta_fns.append(abs_fn)
- dexta_paths = list()
- for dexta_fn in sorted(dexta_fns):
- if not os.path.exists(dexta_fn):
- msg = 'Did not find {!r}. Waiting {} seconds.'.format(dexta_fn, WAIT)
- LOG.info(msg)
- time.sleep(WAIT)
- if not os.path.exists(dexta_fn):
- msg = 'Did not find {!r}, even after waiting {} seconds. Maybe retry later?'.format(dexta_fn, WAIT)
- raise Exception(msg)
- dexta_paths.append(dexta_fn)
- # Serialize result.
- #io.serialize(dexta_paths_fn, sorted(dexta_paths))
- with open(dexta_fofn_fn, 'w') as stream:
- stream.write('\n'.join(dexta_paths))
- stream.write('\n')
- def setup_logging(log_level):
- hdlr = logging.StreamHandler(sys.stderr)
- hdlr.setLevel(log_level)
- hdlr.setFormatter(logging.Formatter('[%(levelname)s]%(message)s'))
- LOG.addHandler(hdlr)
- LOG.setLevel(logging.NOTSET)
- LOG.info('Log-level: {}'.format(log_level))
- def cmd_split(args):
- bam2dexta_split(
- args.bam_subreadset_fn,
- args.wildcards,
- args.split_fn, args.bash_template_fn,
- )
- def cmd_apply(args):
- bam2dexta_apply(args.bam_fn, args.dexta_fn)
- def cmd_combine(args):
- bam2dexta_combine(args.gathered_fn, args.dexta_fofn_fn)
- #def get_ours(config_fn):
- # ours = dict()
- # config = io.deserialize(config_fn)
- # LOG.info('config({!r}):\n{}'.format(config_fn, config))
- # LOG.info('our subset of config:\n{}'.format(ours))
- # return ours
- def add_split_arguments(parser):
- parser.add_argument(
- '--wildcards', default='bam2dexta0_id',
- help='Comma-separated string of keys to be subtituted into output paths for each job, if any. (Helps with snakemake and pypeflow; not needed in pbsmrtpipe, since outputs are pre-determined.)',
- )
- parser.add_argument(
- '--bam-subreadset-fn',
- help='input. Dataset (.xml) of bam files of subreads.'
- )
- parser.add_argument(
- '--split-fn', default='bam2dexta-uows.json',
- help='output. Units-of-work for bam2fasta/dexta.',
- )
- parser.add_argument(
- '--bash-template-fn', default='bash-template.sh',
- help='output. Script to apply later.',
- )
- def add_apply_arguments(parser):
- parser.add_argument(
- '--bam-fn', required=True,
- help='input. bam or dataset')
- parser.add_argument(
- '--dexta-fn', required=True,
- help='output. The dazzler (Gene Myers) dexta-file.',
- )
- def add_combine_arguments(parser):
- parser.add_argument(
- '--gathered-fn', required=True,
- help='input. List of sentinels. Produced by gen_parallel_tasks() gathering. The .las files are next to these.',
- )
- parser.add_argument(
- '--dexta-fofn-fn', required=True,
- help='output. FOFN of dexta paths.')
- class HelpF(argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter):
- pass
- def parse_args(argv):
- description = 'Efficiently generate .dexta from BAM or subread datasets.'
- epilog = 'For more details on .dexta, see https://dazzlerblog.wordpress.com/command-guides/dextractor-command-guide/'
- parser = argparse.ArgumentParser(
- description=description,
- epilog=epilog,
- formatter_class=HelpF,
- )
- parser.add_argument(
- '--log-level', default='INFO',
- help='Python logging level.',
- )
- parser.add_argument(
- '--nproc', type=int, default=0,
- help='ignored for now, but non-zero will mean "No more than this."',
- )
- help_split = 'get each bam-file (or subread dataset file)'
- help_apply = 'run bam2fasta and dexta as a unit-of-work'
- help_combine = 'generate a file of .dexta files'
- subparsers = parser.add_subparsers(help='sub-command help')
- parser_split = subparsers.add_parser('split',
- formatter_class=HelpF,
- description=help_split,
- epilog='',
- help=help_split)
- add_split_arguments(parser_split)
- parser_split.set_defaults(func=cmd_split)
- parser_apply = subparsers.add_parser('apply',
- formatter_class=HelpF,
- description=help_apply,
- epilog='',
- help=help_apply)
- add_apply_arguments(parser_apply)
- parser_apply.set_defaults(func=cmd_apply)
- parser_combine = subparsers.add_parser('combine',
- formatter_class=HelpF,
- description=help_combine,
- epilog='',
- help=help_combine)
- add_combine_arguments(parser_combine)
- parser_combine.set_defaults(func=cmd_combine)
- args = parser.parse_args(argv[1:])
- return args
- def main(argv=sys.argv):
- args = parse_args(argv)
- setup_logging(args.log_level)
- args.func(args)
- if __name__ == '__main__': # pragma: no cover
- main()
|