123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713 |
- from ..pype import (wrap_gen_task as gen_task, gen_parallel_tasks, Dist)
- from .. import run_support
- from .. import bash, pype_tasks, snakemake
- from ..util.system import (only_these_symlinks, lfs_setstripe_maybe)
- from .. import io
- from .. import functional
- # pylint: disable=no-name-in-module, import-error, fixme, line-too-long
- from pypeflow.simple_pwatcher_bridge import (PypeProcWatcherWorkflow, MyFakePypeThreadTaskBase,
- makePypeLocalFile, PypeTask)
- import argparse
- import glob
- import json
- import logging
- import os
- import re
- import sys
- import time
- LOG = logging.getLogger(__name__) # default, for remote tasks
- def check_general_config(general_config, input_config_fn):
- if ('pa_daligner_option' not in general_config or
- 'ovlp_daligner_option' not in general_config):
- msg = '''Missing options.
- We now require both "pa_daligner_option" (stage 0) and "ovlp_daligner_option" (stage 1),
- which are automatically passed along to
- HPC.daligner
- HPC.TANmask
- HPC.REPmask
- These can provide additional flags:
- pa_HPCdaligner_option
- pa_HPCTANmask_option
- ovlp_HPCdaligner_option
- pa_REPmask_code (-g/-c pairs for 3 iterations, e.g. '1,20;5,15;20,10')
- '''
- raise Exception(msg)
- required = ('input_fofn', 'genome_size')
- for name in required:
- assert name in general_config, 'Missing "{}" in {}.'.format(name, input_config_fn)
- def main1(prog_name, input_config_fn, logger_config_fn=None):
- global LOG
- LOG = run_support.setup_logger(logger_config_fn)
- lfs_setstripe_maybe(path='.', stripe=12)
- LOG.info('fc_run started with configuration %s', input_config_fn)
- try:
- config = run_support.parse_cfg_file(input_config_fn)
- import json
- dumped = json.dumps(config, indent=2, separators=(',', ': '), sort_keys=True)
- LOG.info('cfg=\n{}'.format(dumped))
- except Exception:
- LOG.exception('Failed to parse config "{}".'.format(input_config_fn))
- raise
- general_config = config['General']
- check_general_config(general_config, input_config_fn)
- input_fofn_fn = general_config['input_fofn']
- genome_size = int(general_config['genome_size'])
- squash = True if 0 < genome_size < 1000000 else False
- wf = PypeProcWatcherWorkflow(job_defaults=config['job.defaults'],
- squash=squash,
- )
- general_config['ver'] = '100'
- # Store config as JSON, available to many tasks.
- config_fn = './config.json' # must not be in a task-dir
- io.serialize(config_fn, config)
- run(wf, config,
- os.path.abspath(config_fn),
- input_fofn_fn=input_fofn_fn,
- )
- def add_bam2dexta_tasks(
- wf,
- config,
- input_fofn_fn, rawread_dir):
- # run bam2dexta
- bam2dexta_uows_fn = os.path.join(
- rawread_dir, 'bam2dexta-split', 'bam2dexta-uows.json')
- bam2dexta_bash_template_fn = os.path.join(
- rawread_dir, 'bam2dexta-split', 'bash_template.sh')
- wf.addTask(gen_task(
- script=pype_tasks.TASK_BAM2DEXTA_SPLIT_SCRIPT,
- inputs={
- 'bam': input_fofn_fn,
- },
- outputs={
- 'split': bam2dexta_uows_fn,
- 'bash_template': bam2dexta_bash_template_fn,
- },
- parameters={
- 'wildcards': 'bam2dexta_id',
- },
- dist=Dist(local=True),
- ))
- gathered_fn = os.path.join(rawread_dir, 'bam2dexta-gathered', 'gathered-dexta-files.json')
- gen_parallel_tasks(
- wf,
- bam2dexta_uows_fn, gathered_fn,
- run_dict=dict(
- bash_template_fn=bam2dexta_bash_template_fn,
- script='fubar-TODO', #pype_tasks.TASK_DB_TAN_APPLY_SCRIPT, # for snakemake stuff
- inputs={
- 'units_of_work': '0-rawreads/bam2dexta-chunks/{bam2dexta_id}/some-units-of-work.json',
- },
- outputs={
- 'results': '0-rawreads/bam2dexta-runs/{bam2dexta_id}/some-done-files.json',
- },
- parameters={},
- ),
- dist=Dist(NPROC=1, MB=4000, job_dict=config['job.step.da']),
- )
- input_fofn_fn = os.path.join(rawread_dir, 'bam2dexta-combine', 'input.fofn')
- wf.addTask(gen_task(
- script=pype_tasks.TASK_BAM2DEXTA_COMBINE_SCRIPT,
- inputs={
- 'gathered': gathered_fn,
- },
- outputs={
- 'fofn': input_fofn_fn,
- },
- parameters={},
- dist=Dist(local=True),
- ))
- return input_fofn_fn
- def run(wf, config,
- config_fn,
- input_fofn_fn,
- ):
- """
- Preconditions (for now):
- * LOG
- * run_run_support.logger
- """
- parsed_config = io.deserialize(config_fn)
- if parsed_config != config:
- msg = 'Config from {!r} != passed config'.format(config_fn)
- raise Exception(msg)
- general_config = config['General']
- general_config_fn = os.path.join(os.path.dirname(config_fn), 'General_config.json')
- io.serialize(general_config_fn, general_config) # Some tasks use this.
- rawread_dir = '0-rawreads'
- pread_dir = '1-preads_ovl'
- falcon_asm_dir = '2-asm-falcon'
- for d in (rawread_dir, pread_dir, falcon_asm_dir):
- run_support.make_dirs(d)
- # only matter for parallel jobs
- job_defaults = config['job.defaults']
- #exitOnFailure = bool(job_defaults.get('stop_all_jobs_on_failure', False))
- global default_njobs
- default_njobs = int(job_defaults.get('njobs', 7))
- wf.max_jobs = default_njobs
- assert general_config['input_type'] in (
- 'raw', 'preads'), 'Invalid input_type=={!r}'.format(general_config['input_type'])
- parameters = {}
- # ====> 0-rawreads
- if general_config['input_type'] == 'raw':
- # Most common workflow: Start with rawreads.
- if input_fofn_fn.endswith('.xml'):
- input_fofn_fn = add_bam2dexta_tasks(wf, config, input_fofn_fn, rawread_dir)
- # import sequences into daligner DB
- # calculate length_cutoff (if specified as -1)
- # split DB
- # run DBdust
- r_db_dust_fn = os.path.join(rawread_dir, 'build', 'raw_reads.db')
- length_cutoff_fn = os.path.join(rawread_dir, 'build', 'length_cutoff')
- wf.addTask(gen_task(
- script=pype_tasks.TASK_DB_BUILD_SCRIPT,
- inputs={
- 'config': general_config_fn,
- 'input_fofn': input_fofn_fn,
- },
- outputs={
- 'length_cutoff': length_cutoff_fn,
- 'db': r_db_dust_fn,
- # Also .raw_reads.*, of course. And dust track.
- },
- parameters=dict(
- ),
- dist=Dist(NPROC=1, job_dict=config['job.step.dust']),
- ))
- # run TANmask
- tan_uows_fn = os.path.join(
- rawread_dir, 'tan-split', 'tan-uows.json')
- tan_bash_template_fn = os.path.join(
- rawread_dir, 'tan-split', 'bash_template.sh')
- wf.addTask(gen_task(
- script=pype_tasks.TASK_DB_TAN_SPLIT_SCRIPT,
- inputs={
- 'config': general_config_fn,
- 'db': r_db_dust_fn,
- },
- outputs={
- 'split': tan_uows_fn,
- 'bash_template': tan_bash_template_fn,
- },
- parameters={},
- dist=Dist(NPROC=1),
- ))
- gathered_fn = os.path.join(rawread_dir, 'tan-gathered', 'gathered-done-files.json')
- gen_parallel_tasks(
- wf,
- tan_uows_fn, gathered_fn,
- run_dict=dict(
- bash_template_fn=tan_bash_template_fn,
- script='fubar-TODO', #pype_tasks.TASK_DB_TAN_APPLY_SCRIPT, # for snakemake stuff
- inputs={
- 'units_of_work': '0-rawreads/tan-chunks/{tan0_id}/some-units-of-work.json',
- },
- outputs={
- #'job_done': '0-rawreads/{dal0_id}/daligner.done',
- 'results': '0-rawreads/tan-runs/{tan0_id}/some-done-files.json',
- },
- parameters={},
- ),
- dist=Dist(NPROC=4, MB=4000, job_dict=config['job.step.da']),
- )
- r_db_tan_fn = os.path.join(rawread_dir, 'tan-combine', 'raw_reads.db')
- wf.addTask(gen_task(
- script=pype_tasks.TASK_DB_TAN_COMBINE_SCRIPT,
- inputs={
- 'config': general_config_fn,
- 'db': r_db_dust_fn,
- 'gathered': gathered_fn,
- },
- outputs={
- 'new_db': r_db_tan_fn,
- },
- parameters={},
- dist=Dist(local=True),
- ))
- #### HPC.REPmask/daligner/LAmerge
- codes = functional.parse_REPmask_code(general_config['pa_REPmask_code'])
- LOG.info('Parsed pa_REPmask_code (repa,repb,repc): {!r}'.format(codes))
- ### REPmask tasks (a, b, c)
- letter = 'a'
- group_size, coverage_limit = codes[0]
- i_db_fn = r_db_tan_fn
- o_db_fn = add_rep_tasks(wf, rawread_dir, config, general_config,
- general_config_fn, i_db_fn, length_cutoff_fn,
- letter, group_size, coverage_limit)
- letter = 'b'
- group_size, coverage_limit = codes[1]
- i_db_fn = o_db_fn
- o_db_fn = add_rep_tasks(wf, rawread_dir, config, general_config,
- general_config_fn, i_db_fn, length_cutoff_fn,
- letter, group_size, coverage_limit)
- letter = 'c'
- group_size, coverage_limit = codes[2]
- i_db_fn = o_db_fn
- o_db_fn = add_rep_tasks(wf, rawread_dir, config, general_config,
- general_config_fn, i_db_fn, length_cutoff_fn,
- letter, group_size, coverage_limit)
- r_db_rep_fn = o_db_fn
- #### basic daligner/LAmerge
- p_id2las_fn = os.path.join(rawread_dir, 'las-merge-combine', 'p_id2las.json')
- las_fofn_fn = os.path.join(rawread_dir, 'las-merge-combine', 'las_fofn.json')
- add_daligner_and_merge_tasks(
- wf,
- general_config, config['job.step.da'], config['job.step.la'],
- rawread_dir,
- general_config_fn, r_db_rep_fn,
- length_cutoff_fn,
- p_id2las_fn, las_fofn_fn,
- daligner_wildcard='dal0_id',
- lamerge_wildcard='mer0_id',
- daligner_params={},
- db_prefix='raw_reads', # TODO: Infer
- daligner_split_script=pype_tasks.TASK_DB_DALIGNER_SPLIT_SCRIPT,
- )
- ####
- if general_config['target'] == 'overlapping':
- sys.exit(0)
- # Produce new FOFN of preads fasta, based on consensus of overlaps.
- split_fn = os.path.join(
- rawread_dir, 'cns-split', 'split.json')
- bash_template_fn = os.path.join(
- rawread_dir, 'cns-split', 'consensus-bash-template.sh')
- params = dict(parameters)
- params['wildcards'] = 'cns0_id,cns0_id2'
- wf.addTask(gen_task(
- script=pype_tasks.TASK_CONSENSUS_SPLIT_SCRIPT,
- inputs={
- 'p_id2las': p_id2las_fn,
- 'raw_reads_db': r_db_rep_fn,
- 'length_cutoff': length_cutoff_fn,
- 'config': general_config_fn,
- },
- outputs={
- 'split': split_fn,
- 'bash_template': bash_template_fn,
- },
- parameters=params,
- dist=Dist(local=True),
- ))
- gathered_fn = os.path.join(rawread_dir, 'cns-gather', 'gathered.json')
- gen_parallel_tasks(
- wf,
- split_fn, gathered_fn,
- run_dict=dict(
- bash_template_fn=bash_template_fn,
- script=pype_tasks.TASK_CONSENSUS_TASK_SCRIPT, # for snakemake only
- inputs = {
- #'las': '0-rawreads/cns-split/{cns0_id}/merged.{cns0_id2}.las',
- #'db': r_db_rep_fn,
- #'length_cutoff': length_cutoff_fn,
- #'config': general_config_fn,
- 'units_of_work': '0-rawreads/cns-chunks/{cns0_id}/some-units-of-work.json',
- },
- outputs = {
- #'fasta': '0-rawreads/consensus/{cns0_id}/consensus.{cns0_id2}.fasta',
- 'results': '0-rawreads/cns-runs/{cns0_id}/some-done-files.json',
- },
- parameters={},
- ),
- dist=Dist(NPROC=6, job_dict=config['job.step.cns']),
- )
- preads_fofn_fn = os.path.join(rawread_dir, 'preads', 'input_preads.fofn')
- wf.addTask(gen_task(
- script=pype_tasks.TASK_CONSENSUS_GATHER_SCRIPT,
- inputs={
- 'gathered': gathered_fn,
- 'config': general_config_fn,
- 'raw_reads_db': r_db_rep_fn,
- },
- outputs={
- 'preads_fofn': preads_fofn_fn,
- },
- parameters=parameters, #{},
- dist=Dist(local=True),
- ))
- rdir = os.path.join(rawread_dir, 'report')
- pre_assembly_report_fn = os.path.join(rdir, 'pre_assembly_stats.json')
- params = dict(parameters)
- params['length_cutoff_user'] = general_config['length_cutoff']
- params['genome_length'] = general_config['genome_size'] # note different name; historical
- wf.addTask(gen_task(
- script=pype_tasks.TASK_REPORT_PRE_ASSEMBLY_SCRIPT,
- inputs={'length_cutoff': length_cutoff_fn,
- 'raw_reads_db': r_db_rep_fn,
- 'preads_fofn': preads_fofn_fn,
- 'config': general_config_fn,
- },
- outputs={'pre_assembly_report': pre_assembly_report_fn,
- },
- parameters=params,
- dist=Dist(local=True),
- ))
- if general_config['target'] == 'pre-assembly':
- wf.refreshTargets()
- LOG.info('Quitting after stage-0 for General.target=pre-assembly')
- return
- # build pread database
- if general_config['input_type'] == 'preads':
- LOG.info('General.input_type=preads, so we skip stage 0-rawreads.')
- preads_fofn_fn = general_config['input_fofn']
- assert os.path.exists(preads_fofn_fn), '{!r} does not exist.'.format(preads_fofn_fn)
- # ====> 1-preads_ovl
- # pdb_build_done = os.path.join(pread_dir, 'pdb_build_done')
- # run_jobs_fn = os.path.join(pread_dir, 'run_jobs.sh')
- preads_db_fn = os.path.join(pread_dir, 'build', 'preads.db')
- length_cutoff_pr_fn = os.path.join(pread_dir, 'build', 'length_cutoff')
- wf.addTask(gen_task(
- script=pype_tasks.TASK_DB_BUILD_SCRIPT,
- inputs={
- 'config': general_config_fn,
- 'input_fofn': preads_fofn_fn,
- },
- outputs={
- 'length_cutoff': length_cutoff_pr_fn,
- 'db': preads_db_fn,
- # Also .preads.*, of course.
- },
- parameters=dict(
- ),
- dist=Dist(NPROC=1, job_dict=config['job.step.dust']),
- ))
- ####
- p_id2las_fn = os.path.join(pread_dir, 'las-merge-combine', 'block2las.json')
- las_fofn_fn = os.path.join(pread_dir, 'las-merge-combine', 'las_fofn.json')
- add_daligner_and_merge_tasks(
- wf,
- general_config, config['job.step.pda'], config['job.step.pla'],
- pread_dir,
- general_config_fn, preads_db_fn, # no tan-mask for preads
- length_cutoff_pr_fn,
- p_id2las_fn, las_fofn_fn,
- daligner_wildcard='dal1_id',
- lamerge_wildcard='mer1_id',
- daligner_params={},
- db_prefix='preads', # TODO: Infer
- daligner_split_script=pype_tasks.TASK_DB_DALIGNER_SPLIT_SCRIPT,
- )
- ####
- db2falcon_dir = os.path.join(pread_dir, 'db2falcon')
- db2falcon_done_fn = os.path.join(db2falcon_dir, 'db2falcon_done')
- preads4falcon_fn = os.path.join(db2falcon_dir, 'preads4falcon.fasta')
- wf.addTask(gen_task(
- script=pype_tasks.TASK_RUN_DB_TO_FALCON_SCRIPT,
- inputs={'p_id2las': p_id2las_fn,
- 'preads_db': preads_db_fn,
- },
- outputs={'job_done': db2falcon_done_fn,
- 'preads4falcon': preads4falcon_fn,
- },
- parameters={},
- dist=Dist(NPROC=4, job_dict=config['job.step.asm']),
- ))
- # ====> 2-asm-falcon
- falcon_asm_done_fn = os.path.join(falcon_asm_dir, 'falcon_asm_done')
- for key in ('overlap_filtering_setting', 'length_cutoff_pr', 'fc_ovlp_to_graph_option'):
- parameters[key] = general_config[key]
- wf.addTask(gen_task(
- script=pype_tasks.TASK_RUN_FALCON_ASM_SCRIPT,
- inputs={'db2falcon_done': db2falcon_done_fn, 'db_file': preads_db_fn,
- 'preads4falcon_fasta': preads4falcon_fn,
- 'las_fofn': las_fofn_fn,
- 'config': general_config_fn,
- },
- outputs={'falcon_asm_done': falcon_asm_done_fn},
- parameters=parameters,
- dist=Dist(NPROC=4, job_dict=config['job.step.asm']),
- ))
- wf.refreshTargets()
- with io.cd('0-rawreads'):
- # for backwards-compatibility
- io.symlink('las-merge-combine', 'las-gather')
- #return falcon_asm_done
- def add_daligner_and_merge_tasks(
- wf,
- general_config, daligner_job_config, merge_job_config,
- super_dir,
- general_config_fn, db_fn,
- length_cutoff_fn, # not always needed (refactor later)
- p_id2las_fn, las_fofn_fn,
- daligner_wildcard, #='dal0_id',
- lamerge_wildcard, #='mer0_id',
- daligner_params=dict(),
- db_prefix='raw_reads',
- daligner_split_script=pype_tasks.TASK_DB_DALIGNER_SPLIT_SCRIPT,
- ):
- """
- Results:
- p_id2las_fn, las_fofn_fn
- """
- parameters = dict()
- # run daligner
- daligner_all_units_fn = os.path.join(
- super_dir, 'daligner-split', 'all-units-of-work.json')
- daligner_bash_template_fn = os.path.join(
- super_dir, 'daligner-split', 'daligner_bash_template.sh')
- params = dict(daligner_params)
- params['skip_checks'] = int(general_config.get('skip_checks', 0))
- params['wildcards'] = daligner_wildcard
- wf.addTask(gen_task(
- script=daligner_split_script,
- inputs={
- 'config': general_config_fn,
- 'db': db_fn,
- 'length_cutoff': length_cutoff_fn,
- },
- outputs={
- 'split': daligner_all_units_fn,
- 'bash_template': daligner_bash_template_fn
- },
- parameters=params,
- dist=Dist(local=True, NPROC=4), # really, NPROC=1, but we need to know the max
- ))
- gathered_fn = os.path.join(super_dir, 'daligner-gathered', 'gathered-done-files.json')
- gen_parallel_tasks(
- wf,
- daligner_all_units_fn, gathered_fn,
- run_dict=dict(
- bash_template_fn=daligner_bash_template_fn,
- script=pype_tasks.TASK_DB_DALIGNER_APPLY_SCRIPT, # for snakemake stuff
- inputs={
- 'units_of_work': os.path.join(super_dir, 'daligner-chunks/{%s}/some-units-of-work.json'%daligner_wildcard),
- },
- outputs={
- 'results': os.path.join(super_dir, 'daligner-runs/{%s}/some-done-files.json'%daligner_wildcard),
- },
- parameters={},
- ),
- dist=Dist(NPROC=4, MB=4000, job_dict=daligner_job_config),
- )
- gathered_las_fn = os.path.join(super_dir, 'daligner-combine', 'gathered-las.json')
- wf.addTask(gen_task(
- script=pype_tasks.TASK_DB_DALIGNER_COMBINE_SCRIPT,
- inputs={
- 'config': general_config_fn,
- 'db': db_fn,
- 'gathered': gathered_fn,
- },
- outputs={
- 'las_paths': gathered_las_fn,
- },
- parameters={},
- #dist=Dist(NPROC=1, MB=4000, job_dict=daligner_job_config)
- dist=Dist(local=True),
- ))
- # Merge .las files.
- las_merge_all_units_fn = os.path.join(super_dir, 'las-merge-split', 'all-units-of-work.json')
- bash_template_fn = os.path.join(super_dir, 'las-merge-split', 'las-merge-bash-template.sh')
- params = dict(parameters)
- params['db_prefix'] = db_prefix
- params['wildcards'] = lamerge_wildcard
- wf.addTask(gen_task(
- script=pype_tasks.TASK_DB_LAMERGE_SPLIT_SCRIPT,
- inputs={
- 'config': general_config_fn,
- 'las_paths': gathered_las_fn,
- },
- outputs={
- 'split': las_merge_all_units_fn,
- 'bash_template': bash_template_fn,
- },
- parameters=params,
- dist=Dist(local=True),
- ))
- gathered_fn = os.path.join(super_dir, 'las-merge-gathered', 'gathered.json')
- gen_parallel_tasks(
- wf,
- las_merge_all_units_fn, gathered_fn,
- run_dict=dict(
- bash_template_fn=bash_template_fn,
- script=pype_tasks.TASK_DB_LAMERGE_APPLY_SCRIPT, # for snakemake
- inputs={
- 'units_of_work': os.path.join(super_dir, 'las-merge-chunks/{%s}/some-units-of-work.json'%lamerge_wildcard),
- },
- outputs={
- 'results': os.path.join(super_dir, 'las-merge-runs/{%s}/some-las-paths.json'%lamerge_wildcard),
- },
- parameters={},
- ),
- dist=Dist(NPROC=1, job_dict=merge_job_config),
- )
- wf.addTask(gen_task(
- script=pype_tasks.TASK_DB_LAMERGE_COMBINE_SCRIPT,
- inputs={
- 'config': general_config_fn,
- 'gathered': gathered_fn,
- },
- outputs={
- 'block2las': p_id2las_fn,
- 'las_paths': las_fofn_fn,
- },
- parameters={},
- dist=Dist(local=True),
- ))
- def add_rep_tasks(
- wf,
- rawread_dir, config, general_config,
- general_config_fn, i_db_fn, length_cutoff_fn,
- letter, group_size, coverage_limit,
- ):
- """
- Add daligner/lamerge/REPmask parallel tasks for one iteration of repeat-masking.
- TODO: Make the tasks no-ops if the codes are zero (or something like that).
- """
- name = 'rep{}'.format(letter)
- rep_dir = os.path.join(rawread_dir, name)
- o_db_rep_fn = os.path.join(rep_dir, 'rep-combine', 'raw_reads.db')
- p_id2las_fn = os.path.join(rep_dir, 'las-merge-combine', 'p_id2las.json')
- las_fofn_fn = os.path.join(rep_dir, 'las-merge-combine', 'las_fofn.json')
- rep_daligner_params = dict(
- group_size=group_size, coverage_limit=coverage_limit,
- )
- add_daligner_and_merge_tasks(
- wf,
- general_config, config['job.step.da'], config['job.step.la'],
- rep_dir,
- general_config_fn, i_db_fn,
- length_cutoff_fn,
- p_id2las_fn, las_fofn_fn,
- daligner_wildcard='dal0{}_id'.format(letter),
- lamerge_wildcard='mer0{}_id'.format(letter),
- daligner_params=rep_daligner_params,
- db_prefix='raw_reads', # TODO: Infer
- daligner_split_script=pype_tasks.TASK_DB_REP_DALIGNER_SPLIT_SCRIPT,
- )
- ### REPmask
- # rep-split
- # We assume that daligner/LAmerge have already run.
- # Instead of using the REP.mask calls from rep-jobs.05.MASK,
- # we construct our own.
- rep_uows_fn = os.path.join(
- rep_dir, 'rep-split', 'rep-uows.json')
- rep_bash_template_fn = os.path.join(
- rep_dir, 'rep-split', 'bash_template.sh')
- wf.addTask(gen_task(
- script=pype_tasks.TASK_DB_REP_SPLIT_SCRIPT,
- inputs={
- 'config': general_config_fn,
- 'db': i_db_fn,
- 'las_paths': las_fofn_fn,
- },
- outputs={
- 'split': rep_uows_fn,
- 'bash_template': rep_bash_template_fn,
- },
- parameters={
- 'group_size': group_size,
- 'coverage_limit': coverage_limit,
- 'wildcards': '{}_id'.format(name),
- },
- dist=Dist(NPROC=1),
- ))
- # rep-apply
- gathered_fn = os.path.join(rep_dir, 'rep-gathered', 'gathered-done-files.json')
- gen_parallel_tasks(
- wf,
- rep_uows_fn, gathered_fn,
- run_dict=dict(
- bash_template_fn=rep_bash_template_fn,
- script='fubar-TODO', #pype_tasks.TASK_DB_REP_APPLY_SCRIPT, # for snakemake stuff
- inputs={
- 'units_of_work': '0-rawreads/%(name)s/rep-chunks/{%(name)s_id}/some-units-of-work.json'%locals(),
- },
- outputs={
- 'results': '0-rawreads/%(name)s/rep-runs/{%(name)s_id}/some-done-files.json'%locals(),
- },
- parameters={},
- ),
- dist=Dist(NPROC=4, MB=4000, job_dict=config['job.step.da']),
- )
- # rep-combine
- wf.addTask(gen_task(
- script=pype_tasks.TASK_DB_REP_COMBINE_SCRIPT,
- inputs={
- 'config': general_config_fn,
- 'db': i_db_fn,
- 'gathered': gathered_fn,
- },
- outputs={
- 'new_db': o_db_rep_fn,
- },
- parameters={
- 'group_size': group_size,
- },
- dist=Dist(local=True),
- ))
- return o_db_rep_fn
- def main(argv=sys.argv):
- parser = argparse.ArgumentParser()
- parser.add_argument('config',
- help='.cfg/.ini/.json')
- parser.add_argument('logger',
- nargs='?',
- help='(Optional)JSON config for standard Python logging module')
- args = parser.parse_args(argv[1:])
- main1(argv[0], args.config, args.logger)
- if __name__ == '__main__':
- main()
|