123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533 |
- '''
- # DAMASKER options
- """
- Example config usage:
- pa_use_tanmask = true
- pa_use_repmask = true
- pa_HPCtanmask_option =
- pa_repmask_levels = 2
- pa_HPCrepmask_1_option = -g1 -c20 -mtan
- pa_HPCrepmask_2_option = -g10 -c15 -mtan -mrep1
- pa_damasker_HPCdaligner_option = -mtan -mrep1 -mrep10
- """
- pa_use_tanmask = False
- if config.has_option(section, 'pa_use_tanmask'):
- pa_use_tanmask = config.getboolean(section, 'pa_use_tanmask')
- pa_HPCtanmask_option = ""
- if config.has_option(section, 'pa_HPCtanmask_option'):
- pa_HPCtanmask_option = config.get(section, 'pa_HPCtanmask_option')
- pa_use_repmask = False
- if config.has_option(section, 'pa_use_repmask'):
- pa_use_repmask = config.getboolean(section, 'pa_use_repmask')
- pa_repmask_levels = 0 # REPmask tool can be used multiple times.
- if config.has_option(section, 'pa_repmask_levels'):
- pa_repmask_levels = config.getint(section, 'pa_repmask_levels')
- pa_HPCrepmask_1_option = """ -g1 -c20 -mtan"""
- if config.has_option(section, 'pa_HPCrepmask_1_option'):
- pa_HPCrepmask_1_option = config.get(section, 'pa_HPCrepmask_1_option')
- pa_HPCrepmask_2_option = """ -g10 -c15 -mtan -mrep1"""
- if config.has_option(section, 'pa_HPCrepmask_2_option'):
- pa_HPCrepmask_2_option = config.get(section, 'pa_HPCrepmask_2_option')
- pa_damasker_HPCdaligner_option = """ -mtan -mrep1 -mrep10""" # Repeat masks need to be passed to Daligner.
- if config.has_option(section, 'pa_damasker_HPCdaligner_option'):
- pa_damasker_HPCdaligner_option = config.get(
- section, 'pa_damasker_HPCdaligner_option')
- # End of DAMASKER options.
- # Note: We dump the 'length_cutoff' file for later reference within the preassembly report
- # of pbsmrtpipe HGAP.
- # However, it is not a true dependency because we still have a workflow that starts
- # from 'corrected reads' (preads), in which case build_db is not run.
- Catrack -dfv raw_reads.db tan
- '''
- import argparse
- import collections
- import glob
- import itertools
- import logging
- import os
- import re
- import sys
- import tarfile
- from ..util.io import yield_validated_fns
- from .. import io, functional
- from .. import(
- bash, # for write_sub_script
- pype_tasks, # for TASKS
- )
- LOG = logging.getLogger()
- WAIT = 20 # seconds to wait for file to exist
- def filter_DBsplit_option(opt):
- """Always add -a.
- If we want fewer reads, we rely on the fasta_filter.
- Also, add -x, as daligner belches on any read < kmer length.
- """
- flags = opt.split()
- if '-a' not in opt:
- flags.append('-a')
- if '-x' not in opt:
- flags.append('-x70') # daligner belches on any read < kmer length
- return ' '.join(flags)
- def script_build_db(config, input_fofn_fn, db):
- """
- db (e.g. 'raw_reads.db') will be output into CWD, should not already exist.
- 'dust' track will also be generated.
- """
- params = dict(config)
- try:
- cat_fasta = functional.choose_cat_fasta(open(input_fofn_fn).read())
- except Exception:
- LOG.exception('Using "cat" by default.')
- cat_fasta = 'cat '
- DBdust = 'DBdust {} {}'.format(config.get('DBdust_opt', ''), db)
- fasta_filter_option = config.get('fasta_filter_option', 'pass')
- subsample_coverage = config.get('subsample_coverage', 0)
- subsample_strategy = config.get('subsample_strategy', 'random')
- subsample_random_seed = config.get('subsample_random_seed', 0)
- genome_size = config.get('genome_size', 0)
- zmw_whitelist_option = ''
- use_subsampling = 0
- if subsample_coverage > 0:
- use_subsampling = 1
- zmw_whitelist_option = '--zmw-whitelist-fn zmw.whitelist.json'
- params.update(locals())
- script = """\
- echo "PBFALCON_ERRFILE=$PBFALCON_ERRFILE"
- set -o pipefail
- rm -f {db}.db {db}.dam .{db}.* # in case of re-run
- #fc_fasta2fasta < {input_fofn_fn} >| fc.fofn
- zmw_whitelist_option=""
- use_subsampling={use_subsampling}
- if [[ $use_subsampling -eq 1 ]]; then
- while read fn; do {cat_fasta} ${{fn}} | python3 -m falcon_kit.mains.zmw_collect; done < {input_fofn_fn} > zmw.all.tsv
- cat zmw.all.tsv | python3 -m falcon_kit.mains.zmw_subsample --coverage "{subsample_coverage}" --random-seed "{subsample_random_seed}" --strategy "{subsample_strategy}" --genome-size "{genome_size}" zmw.whitelist.json
- zmw_whitelist_option="--zmw-whitelist-fn zmw.whitelist.json"
- fi
- while read fn; do {cat_fasta} ${{fn}} | python3 -m falcon_kit.mains.fasta_filter ${{zmw_whitelist_option}} {fasta_filter_option} - | fasta2DB -v {db} -i${{fn##*/}}; done < {input_fofn_fn}
- #cat fc.fofn | xargs rm -f
- {DBdust}
- """.format(**params)
- return script
- def script_length_cutoff(config, db, length_cutoff_fn='length_cutoff'):
- params = dict(config)
- length_cutoff = config['user_length_cutoff']
- if int(length_cutoff) < 0:
- bash_cutoff = '$(python3 -m falcon_kit.mains.calc_cutoff --coverage {} {} <(DBstats -b1 {}))'.format(
- params['seed_coverage'], params['genome_size'], db)
- else:
- bash_cutoff = '{}'.format(length_cutoff)
- params.update(locals())
- return """
- CUTOFF={bash_cutoff}
- echo -n $CUTOFF >| {length_cutoff_fn}
- """.format(**params)
- def script_DBsplit(config, db):
- params = dict(config)
- params.update(locals())
- DBsplit_opt = filter_DBsplit_option(config['DBsplit_opt'])
- params.update(locals())
- return """
- DBsplit -f {DBsplit_opt} {db}
- #LB=$(cat {db}.db | LD_LIBRARY_PATH= awk '$1 == "blocks" {{print $3}}')
- #echo -n $LB >| db_block_count
- """.format(**params)
- def build_db(config, input_fofn_fn, db_fn, length_cutoff_fn):
- LOG.info('Building rdb from {!r}, to write {!r}'.format(
- input_fofn_fn, db_fn))
- db = os.path.splitext(db_fn)[0]
- # First, fix-up FOFN for thisdir.
- my_input_fofn_fn = 'my.' + os.path.basename(input_fofn_fn)
- with open(my_input_fofn_fn, 'w') as stream:
- for fn in yield_validated_fns(input_fofn_fn):
- stream.write(fn)
- stream.write('\n')
- script = ''.join([
- script_build_db(config, my_input_fofn_fn, db),
- script_DBsplit(config, db),
- script_length_cutoff(config, db, length_cutoff_fn),
- ])
- script_fn = 'build_db.sh'
- with open(script_fn, 'w') as ofs:
- exe = bash.write_sub_script(ofs, script)
- io.syscall('bash -vex {}'.format(script_fn))
- def script_HPC_daligner(config, db, length_cutoff_fn, tracks, prefix):
- params = dict(config)
- masks = ' '.join('-m{}'.format(track) for track in tracks)
- params.update(locals())
- symlink(length_cutoff_fn, 'CUTOFF')
- return """
- #LB=$(cat db_block_count)
- CUTOFF=$(cat CUTOFF)
- rm -f daligner-jobs.*
- echo "SMRT_PYTHON_PATH_PREPEND=$SMRT_PYTHON_PATH_PREPEND"
- echo "PATH=$PATH"
- which HPC.daligner
- HPC.daligner -P. {daligner_opt} {masks} -H$CUTOFF -f{prefix} {db}
- """.format(**params)
- def script_HPC_TANmask(config, db, tracks, prefix):
- assert prefix and '/' not in prefix
- params = dict(config)
- #masks = ' '.join('-m{}'.format(track) for track in tracks)
- params.update(locals())
- return """
- rm -f {prefix}.*
- rm -f .{db}.*.tan.anno
- rm -f .{db}.*.tan.data
- echo "SMRT_PYTHON_PATH_PREPEND=$SMRT_PYTHON_PATH_PREPEND"
- echo "PATH=$PATH"
- which HPC.TANmask
- HPC.TANmask -P. {TANmask_opt} -v -f{prefix} {db}
- """.format(**params)
- def script_HPC_REPmask(config, db, tracks, prefix, group_size, coverage_limit):
- if group_size == 0: # TODO: Make this a no-op.
- group_size = 1
- coverage_limit = 10**9 # an arbitrary large number
- assert prefix and '/' not in prefix
- params = dict(config)
- masks = ' '.join('-m{}'.format(track) for track in tracks)
- params.update(locals())
- return """
- rm -f {prefix}.*
- rm -f .{db}.*.rep*.anno
- rm -f .{db}.*.rep*.data
- echo "SMRT_PYTHON_PATH_PREPEND=$SMRT_PYTHON_PATH_PREPEND"
- echo "PATH=$PATH"
- which HPC.REPmask
- HPC.REPmask -P. -g{group_size} -c{coverage_limit} {REPmask_opt} {masks} -v -f{prefix} {db}
- """.format(**params)
- def symlink(actual, symbolic=None, force=True):
- """Symlink into cwd, relatively.
- symbolic name is basename(actual) if not provided.
- If not force, raise when already exists and does not match.
- But ignore symlink to self.
- """
- symbolic = os.path.basename(actual) if not symbolic else symbolic
- if os.path.abspath(actual) == os.path.abspath(symbolic):
- LOG.warning('Cannot symlink {!r} as {!r}, itself.'.format(actual, symbolic))
- return
- rel = os.path.relpath(actual)
- if force:
- LOG.info('ln -sf {} {}'.format(rel, symbolic))
- if os.path.lexists(symbolic):
- if os.readlink(symbolic) == rel:
- return
- else:
- os.unlink(symbolic)
- else:
- LOG.info('ln -s {} {}'.format(rel, symbolic))
- if os.path.lexists(symbolic):
- if os.readlink(symbolic) != rel:
- msg = '{!r} already exists as {!r}, not {!r}'.format(
- symbolic, os.readlink(symbolic), rel)
- raise Exception(msg)
- else:
- LOG.info('{!r} already points to {!r}'.format(symbolic, rel))
- return
- os.symlink(rel, symbolic)
- def find_db(db):
- if db.endswith('.db') or db.endswith('.dam'):
- if not os.path.exists(db):
- raise Exception('DB "{}" does not exist.'.format(db))
- return db
- db_fn = db + '.db'
- if os.path.exists(db_fn):
- return db_fn
- db_fn = db + '.dam'
- if os.path.exists(db_fn):
- return db_fn
- raise Exception('Could not find DB "{}"'.format(db))
- def symlink_db(db_fn, symlink=symlink):
- """Symlink (into cwd) everything that could be related to this Dazzler DB.
- Exact matches will probably cause an exception in symlink().
- """
- if not os.path.exists(db_fn):
- db_fn = find_db(db_fn)
- db_dirname, db_basename = os.path.split(os.path.normpath(db_fn))
- if not db_dirname:
- return # must be here already
- dbname, suffix = os.path.splitext(db_basename)
- # Note: could be .db or .dam
- fn = os.path.join(db_dirname, dbname + suffix)
- symlink(fn)
- re_suffix = re.compile(r'^\.%s(\.idx|\.bps|\.dust\.data|\.dust\.anno|\.tan\.data|\.tan\.anno|\.rep\d+\.data|\.rep\d+\.anno)$'%dbname)
- all_basenames = os.listdir(db_dirname)
- for basename in sorted(all_basenames):
- mo = re_suffix.search(basename)
- if not mo:
- continue
- fn = os.path.join(db_dirname, basename)
- if os.path.exists(fn):
- symlink(fn)
- else:
- LOG.warning('Symlink {!r} seems to be broken.'.format(fn))
- return dbname
- def tan_split(config, config_fn, db_fn, uows_fn, bash_template_fn):
- with open(bash_template_fn, 'w') as stream:
- stream.write(pype_tasks.TASK_DB_TAN_APPLY_SCRIPT)
- # TANmask would put track-files in the DB-directory, not '.',
- # so we need to symlink everything first.
- symlink_db(db_fn)
- db = os.path.splitext(db_fn)[0]
- dbname = os.path.basename(db)
- tracks = get_tracks(db_fn)
- # We assume the actual DB will be symlinked here.
- base_db = os.path.basename(db)
- script = ''.join([
- script_HPC_TANmask(config, base_db, tracks, prefix='tan-jobs'),
- ])
- script_fn = 'split.sh'
- with open(script_fn, 'w') as ofs:
- exe = bash.write_sub_script(ofs, script)
- io.syscall('bash -vex {}'.format(script_fn))
- # We now have files like tan-jobs.01.OVL
- # We need to parse that one. (We ignore the others.)
- lines = open('tan-jobs.01.OVL').readlines()
- re_block = re.compile(r'{}(\.\d+|)'.format(dbname))
- def get_blocks(line):
- """Return ['.1', '.2', ...]
- """
- return [mo.group(1) for mo in re_block.finditer(line)]
- scripts = list()
- for line in lines:
- if line.startswith('#'):
- continue
- if not line.strip():
- continue
- blocks = get_blocks(line)
- assert blocks, 'No blocks found in {!r} from {!r}'.format(line, 'tan-jobs.01.OVL')
- las_files = ' '.join('TAN.{db}{block}.las'.format(db=dbname, block=block) for block in blocks)
- # We assume the actual DB will be symlinked here.
- base_db = os.path.basename(db)
- script_lines = [
- line,
- 'LAcheck {} {}\n'.format(base_db, las_files),
- 'TANmask {} {}\n'.format(base_db, las_files),
- 'rm -f {}\n'.format(las_files),
- ]
- if [''] == blocks:
- # special case -- If we have only 1 block, then HPC.TANmask fails to use the block-number.
- # However, if there are multiple blocks, it is still possible for a single line to have
- # only 1 block. So we look for a solitary block that is '', and we symlink the .las to pretend
- # that it was named properly in the first place.
- script_lines.append('mv .{db}.tan.data .{db}.1.tan.data\n'.format(db=dbname))
- script_lines.append('mv .{db}.tan.anno .{db}.1.tan.anno\n'.format(db=dbname))
- scripts.append(''.join(script_lines))
- jobs = list()
- uow_dirs = list()
- for i, script in enumerate(scripts):
- job_id = 'tan_{:03d}'.format(i)
- script_dir = os.path.join('.', 'tan-scripts', job_id)
- script_fn = os.path.join(script_dir, 'run_datander.sh')
- io.mkdirs(script_dir)
- with open(script_fn, 'w') as stream:
- stream.write('{}\n'.format(script))
- # Record in a job-dict.
- job = dict()
- job['input'] = dict(
- config=config_fn,
- db=db_fn,
- script=script_fn,
- )
- job['output'] = dict(
- job_done = 'job.done'
- )
- job['params'] = dict(
- )
- job['wildcards'] = dict(
- tan0_id=job_id,
- )
- jobs.append(job)
- # Write into a uow directory.
- uow_dn = 'uow-{:04d}'.format(i)
- io.mkdirs(uow_dn)
- with io.cd(uow_dn):
- script_fn = 'uow.sh'
- with open(script_fn, 'w') as stream:
- stream.write(script)
- # Add symlinks.
- symlink_db(os.path.join('..', base_db))
- uow_dirs.append(uow_dn)
- io.serialize(uows_fn, jobs)
- # For Cromwell, we use a tar-file instead.
- move_into_tar('all-units-of-work', uow_dirs)
- def tan_apply(db_fn, script_fn, job_done_fn):
- # datander would put track-files in the DB-directory, not '.',
- # so we need to symlink everything first.
- db = symlink_db(db_fn)
- symlink(script_fn)
- io.syscall('bash -vex {}'.format(os.path.basename(script_fn)))
- io.touch(job_done_fn)
- def track_combine(db_fn, track, anno_fofn_fn, data_fofn_fn):
- # track is typically 'tan' or 'repN'.
- symlink_db(db_fn)
- db = os.path.splitext(db_fn)[0]
- dbname = os.path.basename(db)
- def symlink_unprefixed(fn):
- bn = os.path.basename(fn)
- if bn.startswith('dot'):
- bn = bn[3:]
- return symlink(fn, bn)
- # Inputs will be symlinked into CWD, sans our prefix (assumed to be "dot").
- # Note: we cannot use "validated" yielders b/c these can be zero-size.
- for (i, pair) in enumerate(itertools.zip_longest(
- io.yield_abspath_from_fofn(anno_fofn_fn),
- io.yield_abspath_from_fofn(data_fofn_fn))):
- (anno_fn, data_fn) = pair
- symlink_unprefixed(anno_fn)
- symlink_unprefixed(data_fn)
- cmd = 'Catrack -vdf {} {}'.format(dbname, track)
- LOG.info(cmd)
- io.syscall(cmd)
- def tan_combine(db_fn, gathered_fn, new_db_fn):
- new_db = os.path.splitext(new_db_fn)[0]
- db = symlink_db(db_fn)
- assert db == new_db, 'basename({!r})!={!r}, but old and new DB names must match.'.format(db_fn, new_db_fn)
- # Remove old, in case of resume.
- io.syscall('rm -f .{db}.*.tan.anno .{db}.*.tan.data'.format(db=db))
- gathered = io.deserialize(gathered_fn)
- gathered_dn = os.path.dirname(gathered_fn)
- # Create symlinks for all track-files.
- for job in gathered:
- done_fn = job['job_done']
- done_dn = os.path.dirname(done_fn)
- if not os.path.isabs(done_dn):
- LOG.info('Found relative done-file: {!r}'.format(done_fn))
- done_dn = os.path.join(gathered_dn, done_dn)
- annos = glob.glob('{}/.{}.*.tan.anno'.format(done_dn, db))
- datas = glob.glob('{}/.{}.*.tan.data'.format(done_dn, db))
- assert len(annos) == len(datas), 'Mismatched globs:\n{!r}\n{!r}'.format(annos, datas)
- for fn in annos + datas:
- symlink(fn, force=False)
- cmd = 'Catrack -vdf {} tan'.format(db)
- io.syscall(cmd)
- def rep_split(config, config_fn, db_fn, las_paths_fn, wildcards, group_size, coverage_limit, uows_fn, bash_template_fn):
- """For foo.db, HPC.REPmask would produce rep-jobs.05.MASK lines like this:
- # REPmask jobs (n)
- REPmask -v -c30 -nrep1 foo foo.R1.@1-3
- REPmask -v -c30 -nrep1 foo foo.R1.@4-6
- ...
- (That's for level R1.)
- We will do one block at-a-time, for simplicity.
- """
- with open(bash_template_fn, 'w') as stream:
- stream.write(pype_tasks.TASK_DB_REP_APPLY_SCRIPT)
- db = symlink_db(db_fn)
- las_paths = io.deserialize(las_paths_fn)
- scripts = list()
- for i, las_fn in enumerate(las_paths):
- las_files = las_fn # one at-a-time
- # We assume the actual DB will be symlinked here.
- base_db = os.path.basename(db)
- script_lines = [
- #'LAcheck {} {}\n'.format(db, las_files),
- 'REPmask -v -c{} -nrep{} {} {}\n'.format(
- coverage_limit, group_size, base_db, las_files),
- 'rm -f {}\n'.format(las_files),
- ]
- scripts.append(''.join(script_lines))
- jobs = list()
- for i, script in enumerate(scripts):
- job_id = 'rep_{:03d}'.format(i)
- script_dir = os.path.join('.', 'rep-scripts', job_id)
- script_fn = os.path.join(script_dir, 'run_REPmask.sh')
- io.mkdirs(script_dir)
- with open(script_fn, 'w') as stream:
- stream.write('{}\n'.format(script))
- # Record in a job-dict.
- job = dict()
- job['input'] = dict(
- config=config_fn,
- db=db_fn,
- script=script_fn,
- )
- job['output'] = dict(
- job_done = 'job.done'
- )
- job['params'] = dict(
- )
- job['wildcards'] = {wildcards: job_id}
- jobs.append(job)
- io.serialize(uows_fn, jobs)
- def rep_apply(db_fn, script_fn, job_done_fn):
- # daligner would put track-files in the DB-directory, not '.',
- # so we need to symlink everything first.
- db = symlink_db(db_fn)
- symlink(script_fn)
- io.syscall('bash -vex {}'.format(os.path.basename(script_fn)))
- io.touch(job_done_fn)
- def rep_combine(db_fn, gathered_fn, group_size, new_db_fn):
- new_db = os.path.splitext(new_db_fn)[0]
- db = symlink_db(db_fn)
- assert db == new_db, 'basename({!r})!={!r}, but old and new DB names must match.'.format(db_fn, new_db_fn)
- # Remove old, in case of resume.
- io.syscall('rm -f .{db}.*.rep{group_size}.anno .{db}.*.rep{group_size}.data'.format(**locals()))
- gathered = io.deserialize(gathered_fn)
- gathered_dn = os.path.dirname(gathered_fn)
- # Create symlinks for all track-files.
- for job in gathered:
- done_fn = job['job_done']
- done_dn = os.path.dirname(done_fn)
- if not os.path.isabs(done_dn):
- LOG.info('Found relative done-file: {!r}'.format(done_fn))
- done_dn = os.path.join(gathered_dn, done_dn)
- annos = glob.glob('{}/.{}.*.rep{}.anno'.format(done_dn, db, group_size))
- datas = glob.glob('{}/.{}.*.rep{}.data'.format(done_dn, db, group_size))
- assert len(annos) == len(datas), 'Mismatched globs:\n{!r}\n{!r}'.format(annos, datas)
- for fn in annos + datas:
- symlink(fn, force=False)
- cmd = 'Catrack -vdf {} rep{}'.format(db, group_size)
- io.syscall(cmd)
- '''
- daligner -v -k18 -w8 -h480 -e0.8 -P. /localdisk/scratch/cdunn/repo/FALCON-examples/run/greg200k-sv2/0-rawreads/tan-combine/raw_reads /localdisk/scratch/cdunn/repo/FALCON-examples/run/greg200k-sv2/0-rawreads/tan-combine/raw_reads && mv raw_reads.raw_reads.las raw_reads.R1.1.las
- '''
- def fake_rep_as_daligner_script_moved(script, dbname):
- """
- Special case:
- # Daligner jobs (1)
- daligner raw_reads raw_reads && mv raw_reads.raw_reads.las raw_reads.R1.1.las
- Well, unlike for daligner_split, here the block-number is there for this degenerate case. Good!
- """
- """
- We have db.Rn.block.las
- We want db.block.db.block.las, for now. (We will 'merge' this solo file later.)
- """
- re_script = re.compile(r'(mv\b.*\S+\s+)(\S+)$') # no trailing newline, for now
- mo = re_script.search(script)
- if not mo:
- msg = 'Only 1 line in foo-jobs.01.OVL, but\n {!r} did not match\n {!r}.'.format(
- re_script.pattern, script)
- LOG.warning(msg)
- return script
- else:
- new_script = re_script.sub(r'\1{dbname}.1.{dbname}.1.las'.format(dbname=dbname), script, 1)
- msg = 'Only 1 line in foo-jobs.01.OVL:\n {!r} matches\n {!r}. Replacing with\n {!r}.'.format(
- re_script.pattern, script, new_script)
- LOG.warning(msg)
- return new_script
- def fake_rep_as_daligner_script_unmoved(script, dbname):
- """
- Typical case:
- # Daligner jobs (N)
- daligner raw_reads raw_reads && mv raw_reads.3.raw_reads.3.las raw_reads.R1.3.las
- Well, unlike for daligner_split, here the block-number is there for this degenerate case. Good!
- """
- """
- We have db.Rn.block.las
- We want db.block.db.block.las. (We will merge later.)
- """
- re_script = re.compile(r'\s*\&\&\s*mv\s+.*$') # no trailing newline, for now
- mo = re_script.search(script)
- if not mo:
- msg = 'Many lines in foo-jobs.01.OVL, but\n {!r} did not match\n {!r}.'.format(
- re_script.pattern, script)
- LOG.warning(msg)
- return script
- else:
- new_script = re_script.sub('', script, 1)
- msg = 'Many lines in foo-jobs.01.OVL:\n {!r} matches\n {!r}. Replacing with\n {!r}.'.format(
- re_script.pattern, script, new_script)
- LOG.warning(msg)
- return new_script
- def _get_rep_daligner_split_noop_scripts(db_fn):
- """
- We need code to generate an empty .las file for each block.
- We do all in the same script to reduce the number of qsub calls for each iteration.
- (We cannot generate only 1 block because Catrack expects all N.)
- """
- with open(db_fn) as stream:
- nblocks = functional.dazzler_get_nblocks(stream)
- LOG.debug('Found {} blocks in DB {!r}'.format(nblocks, db_fn))
- db = os.path.splitext(db_fn)[0]
- dbname = os.path.basename(db)
- lines = []
- for i in range(1, nblocks+1):
- las_fn = '{db}.{i}.{db}.{i}.las'.format(db=dbname, i=i)
- lines.append('python3 -m falcon_kit.mains.las_write_empty {}'.format(las_fn))
- script = '\n'.join(lines)
- return [script]
- def _get_rep_daligner_split_scripts(config, db_fn, group_size, coverage_limit):
- db = os.path.splitext(db_fn)[0]
- dbname = os.path.basename(db)
- tracks = get_tracks(db_fn)
- # First, run HPC.REPmask immediately.
- script = ''.join([
- script_HPC_REPmask(config, db, tracks,
- prefix='rep-jobs', group_size=group_size, coverage_limit=coverage_limit),
- ])
- script_fn = 'split_db.sh'
- with open(script_fn, 'w') as ofs:
- exe = bash.write_sub_script(ofs, script)
- io.syscall('bash -vex {}'.format(script_fn))
- # We now have files like rep-jobs.01.OVL
- # We need to parse that one. (We ignore the others.)
- lines = open('rep-jobs.01.OVL').readlines()
- scripts = list()
- for line in lines:
- if line.startswith('#'):
- continue
- if not line.strip():
- continue
- scripts.append(line)
- if len(scripts) == 1:
- scripts = [fake_rep_as_daligner_script_moved(s, dbname) for s in scripts]
- else:
- scripts = [fake_rep_as_daligner_script_unmoved(s, dbname) for s in scripts]
- for i, script in enumerate(scripts):
- LAcheck = 'LAcheck -vS {} *.las'.format(db)
- script += '\n' + LAcheck + '\n'
- scripts[i] = script
- return scripts
- def rep_daligner_split(config, config_fn, db_fn, nproc, wildcards, group_size, coverage_limit, uows_fn, bash_template_fn):
- """Similar to daligner_split(), but based on HPC.REPmask instead of HPC.daligner.
- """
- with open(bash_template_fn, 'w') as stream:
- stream.write(pype_tasks.TASK_DB_DALIGNER_APPLY_SCRIPT)
- symlink_db(db_fn)
- if group_size == 0:
- scripts = _get_rep_daligner_split_noop_scripts(os.path.basename(db_fn))
- else:
- scripts = _get_rep_daligner_split_scripts(config, os.path.basename(db_fn), group_size, coverage_limit)
- jobs = list()
- for i, script in enumerate(scripts):
- job_id = 'rep_{:04d}'.format(i)
- script_dir = os.path.join('.', 'rep-scripts', job_id)
- script_fn = os.path.join(script_dir, 'run_daligner.sh')
- io.mkdirs(script_dir)
- with open(script_fn, 'w') as stream:
- stream.write('{}\n'.format(script))
- # Record in a job-dict.
- job = dict()
- job['input'] = dict(
- config=config_fn,
- db=db_fn,
- script=script_fn,
- )
- job['output'] = dict(
- job_done = 'job.done'
- )
- job['params'] = dict(
- )
- job['wildcards'] = {wildcards: job_id}
- jobs.append(job)
- io.serialize(uows_fn, jobs)
- def get_tracks(db_fn):
- db_dirname, db_basename = os.path.split(db_fn)
- dbname = os.path.splitext(db_basename)[0]
- fns = glob.glob('{}/.{}.*.anno'.format(db_dirname, dbname))
- re_anno = re.compile(r'\.{}\.([^\.]+)\.anno'.format(dbname))
- tracks = [re_anno.search(fn).group(1) for fn in fns]
- return tracks
- def daligner_split(config, config_fn, db_fn, nproc, wildcards, length_cutoff_fn, split_fn, bash_template_fn):
- with open(bash_template_fn, 'w') as stream:
- stream.write(pype_tasks.TASK_DB_DALIGNER_APPLY_SCRIPT)
- symlink_db(db_fn)
- db = os.path.splitext(db_fn)[0]
- dbname = os.path.basename(db)
- tracks = get_tracks(db_fn)
- # We assume the actual DB will be symlinked here.
- base_db = os.path.basename(db)
- script = ''.join([
- script_HPC_daligner(config, base_db, length_cutoff_fn, tracks, prefix='daligner-jobs'),
- ])
- script_fn = 'split_db.sh'
- with open(script_fn, 'w') as ofs:
- exe = bash.write_sub_script(ofs, script)
- io.syscall('bash -vex {}'.format(script_fn))
- # We now have files like daligner-jobs.01.OVL
- # We need to parse that one. (We ignore the others.)
- lines = open('daligner-jobs.01.OVL').readlines()
- preads_aln = True if dbname == 'preads' else False
- xformer = functional.get_script_xformer(preads_aln)
- LOG.debug('preads_aln={!r} (True => use daligner_p)'.format(preads_aln))
- scripts = list()
- for line in lines:
- if line.startswith('#'):
- continue
- if not line.strip():
- continue
- line = xformer(line) # Use daligner_p for preads.
- scripts.append(line)
- """
- Special case:
- # Daligner jobs (1)
- daligner raw_reads raw_reads && mv raw_reads.raw_reads.las raw_reads.las
- In that case, the "block" name is empty. (See functional.py)
- We will rename the file. (LAmerge on a single input is a no-op, which is fine.)
- """
- if len(scripts) == 1:
- script = scripts[0]
- re_script = re.compile(r'(mv\b.*\S+\s+)(\S+)$') # no trailing newline, for now
- mo = re_script.search(script)
- if not mo:
- msg = 'Only 1 line in daligner-jobs.01.OVL, but\n {!r} did not match\n {!r}.'.format(
- re_script.pattern, script)
- LOG.warning(msg)
- else:
- new_script = re_script.sub(r'\1{dbname}.1.{dbname}.1.las'.format(dbname=dbname), script, 1)
- msg = 'Only 1 line in daligner-jobs.01.OVL:\n {!r} matches\n {!r}. Replacing with\n {!r}.'.format(
- re_script.pattern, script, new_script)
- LOG.warning(msg)
- scripts = [new_script]
- for i, script in enumerate(scripts):
- LAcheck = 'LAcheck -vS {} *.las'.format(base_db)
- script += '\n' + LAcheck + '\n'
- scripts[i] = script
- jobs = list()
- uow_dirs = list()
- for i, script in enumerate(scripts):
- job_id = 'j_{:04d}'.format(i)
- script_dir = os.path.join('.', 'daligner-scripts', job_id)
- # Write for job-dict.
- script_fn = os.path.join(script_dir, 'run_daligner.sh')
- io.mkdirs(script_dir)
- with open(script_fn, 'w') as stream:
- stream.write(script)
- # Record in a job-dict.
- job = dict()
- job['input'] = dict(
- config=config_fn,
- db=db_fn,
- script=script_fn,
- )
- job['output'] = dict(
- job_done = 'daligner.done'
- )
- job['params'] = dict(
- )
- job['wildcards'] = {wildcards: job_id}
- jobs.append(job)
- # Write into a uow directory.
- uow_dn = 'uow-{:04d}'.format(i)
- io.mkdirs(uow_dn)
- with io.cd(uow_dn):
- script_fn = 'uow.sh'
- with open(script_fn, 'w') as stream:
- stream.write(script)
- # Add symlinks.
- symlink_db(os.path.join('..', base_db))
- uow_dirs.append(uow_dn)
- io.serialize(split_fn, jobs)
- # For Cromwell, we use a tar-file instead.
- move_into_tar('all-units-of-work', uow_dirs)
- def move_into_tar(dn, fns):
- # Create directory 'dn'.
- # Move files (or dir-trees) into directory 'dn', and tar it.
- # By convention, for tar-file "foo.tar", we first move everything into a directory named "foo".
- io.mkdirs(dn)
- for fn in fns:
- cmd = 'mv {} {}'.format(fn, dn)
- io.syscall(cmd)
- tar_fn = '{}.tar'.format(dn)
- #with tarfile.TarFile(tar_fn, 'w', dereference=False, ignore_zeros=True, errorlevel=2) as tf:
- # tf.add(dn)
- cmd = 'tar cvf {} {}'.format(tar_fn, dn)
- io.syscall(cmd)
- io.rmdirs(dn)
- def daligner_apply(db_fn, script_fn, job_done_fn):
- symlink(script_fn)
- symlink_db(db_fn)
- io.syscall('bash -vex {}'.format(os.path.basename(script_fn)))
- io.touch(job_done_fn)
- class MissingLas(Exception):
- pass
- def is_perfect_square(n):
- import math
- root = round(math.sqrt(n))
- return n == root*root
- def daligner_combine(db_fn, gathered_fn, las_paths_fn):
- """Merge all .las pair-files from gathered daligner runs.
- Write simple las_paths_fn and archaic p_id2las_fn.
- """
- gathered = io.deserialize(gathered_fn)
- d = os.path.abspath(os.path.realpath(os.path.dirname(gathered_fn)))
- def abspath(fn):
- if os.path.isabs(fn):
- return fn # I expect this never to happen though.
- return os.path.join(d, fn)
- job_done_fns = list()
- for job_output in gathered:
- for fn in job_output.values():
- abs_fn = abspath(fn)
- job_done_fns.append(abs_fn)
- #import pprint
- #LOG.info('job_done_fns: {}'.format(pprint.pformat(job_done_fns)))
- job_rundirs = sorted(os.path.dirname(fn) for fn in job_done_fns)
- import time
- def find_las_paths():
- las_paths = list()
- for uow_dir in job_rundirs:
- # We could assert the existence of a job_done file here.
- d = os.path.abspath(uow_dir)
- las_path_glob = glob.glob(os.path.join(d, '*.las'))
- LOG.debug('dir={!r}, glob={!r}'.format(d, las_path_glob))
- if not las_path_glob:
- LOG.info('No .las found in {!r}. Sleeping {} seconds before retrying.'.format(d, WAIT))
- time.sleep(WAIT)
- las_path_glob = glob.glob(os.path.join(d, '*.las'))
- LOG.debug('dir={!r}, glob={!r}'.format(d, las_path_glob))
- if not las_path_glob:
- msg = 'No .las files found in daligner dir {!r}.'.format(d)
- raise MissingLas(msg)
- las_paths.extend(las_path_glob)
- n = len(las_paths)
- if not is_perfect_square(n):
- #msg = '{} is not a perfect square. We must be missing some .las files.'.format(n)
- #raise MissingLas(msg)
- pass
- return las_paths
- try:
- las_paths = sorted(find_las_paths(), key=lambda fn: os.path.basename(fn))
- except MissingLas as exc:
- LOG.exception('Not enough .las found from {!r}. Sleeping {} seconds before retrying.'.format(
- gathered_fn, WAIT))
- time.sleep(WAIT)
- las_paths = find_las_paths()
- io.serialize(las_paths_fn, las_paths)
- def merge_split(config_fn, dbname, las_paths_fn, wildcards, split_fn, bash_template_fn):
- with open(bash_template_fn, 'w') as stream:
- stream.write(pype_tasks.TASK_DB_LAMERGE_APPLY_SCRIPT)
- las_paths = io.deserialize(las_paths_fn)
- re_las_pair = re.compile(r'{db}\.(\d+)\.{db}\.(\d+)\.las$'.format(db=dbname))
- las_map = collections.defaultdict(list)
- for path in las_paths:
- mo = re_las_pair.search(path)
- if not mo:
- msg = '{!r} does not match regex {!r}'.format(
- path, re_las_pair.pattern)
- raise Exception(msg)
- a, b = int(mo.group(1)), int(mo.group(2))
- las_map[a].append(path)
- jobs = list()
- for i, block in enumerate(las_map):
- job_id = 'm_{:05d}'.format(i)
- # Write the las files for this job.
- input_dir = os.path.join('merge-scripts', job_id)
- las_paths_fn = os.path.join('.', input_dir, 'las-paths.json')
- io.mkdirs(input_dir)
- las_paths = las_map[block]
- io.serialize(las_paths_fn, las_paths)
- # Record in a job-dict.
- las_fn = '{}.{}.las'.format(dbname, block)
- job = dict()
- job['input'] = dict(
- config=config_fn,
- #db=db_fn,
- #script=os.path.abspath(script_fn),
- las_paths=las_paths_fn,
- )
- job['output'] = dict(
- #job_done = 'daligner.done'
- las_fn=las_fn
- )
- job['params'] = dict(
- )
- job['wildcards'] = {wildcards: job_id}
- jobs.append(job)
- io.serialize(split_fn, jobs)
- def ichunked(seq, chunksize):
- """Yields items from an iterator in iterable chunks.
- https://stackoverflow.com/a/1335572
- """
- from itertools import chain, islice
- try:
- it = iter(seq)
- while True:
- yield chain([next(it)], islice(it, chunksize-1))
- except StopIteration:
- return
- def merge_apply(las_paths_fn, las_fn):
- """Merge the las files into one, a few at a time.
- This replaces the logic of HPC.daligner.
- """
- io.rm_force(las_fn)
- #all_las_paths = rel_to(io.deserialize(las_paths_fn), os.path.dirname(las_paths_fn))
- all_las_paths = io.deserialize(las_paths_fn)
- # Create symlinks, so system calls will be shorter.
- all_syms = list()
- for fn in all_las_paths:
- symlink(fn)
- all_syms.append(os.path.basename(fn))
- curr_paths = sorted(all_syms)
- # Merge a few at-a-time.
- at_a_time = 250 # max is 252 for LAmerge
- level = 1
- while len(curr_paths) > 1:
- level += 1
- next_paths = list()
- for i, paths in enumerate(ichunked(curr_paths, at_a_time)):
- tmp_las = 'L{}.{}.las'.format(level, i+1)
- paths_arg = ' '.join(paths)
- cmd = 'LAmerge -v {} {}'.format(tmp_las, paths_arg)
- io.syscall(cmd)
- next_paths.append(tmp_las)
- curr_paths = next_paths
- # Save only the one we want.
- io.syscall('mv -f {} {}'.format(curr_paths[0], 'keep-this'))
- io.syscall('rm -f *.las')
- io.syscall('mv -f {} {}'.format('keep-this', las_fn))
- def merge_combine(gathered_fn, las_paths_fn, block2las_fn):
- gathered = io.deserialize(gathered_fn)
- d = os.path.abspath(os.path.realpath(os.path.dirname(gathered_fn)))
- def abspath(fn):
- if os.path.isabs(fn):
- return fn # I expect this never to happen though.
- return os.path.join(d, fn)
- las_fns = list()
- for job_output in gathered:
- assert len(job_output) == 1, 'len(job_output) == {} != 1'.format(len(job_output))
- for fn in list(job_output.values()):
- abs_fn = abspath(fn)
- las_fns.append(abs_fn)
- #import pprint
- #LOG.info('job_done_fns: {}'.format(pprint.pformat(job_done_fns)))
- import time
- #job_rundirs = sorted(os.path.dirname(fn) for fn in job_done_fns)
- #las_paths = list()
- #for uow_dir in job_rundirs:
- # # We could assert the existence of a job_done file here.
- # d = os.path.abspath(uow_dir)
- # las_path_glob = glob.glob(os.path.join(d, '*.las'))
- # LOG.debug('dir={!r}, glob={!r}'.format(d, las_path_glob))
- # if not las_path_glob:
- # time.sleep(WAIT)
- # las_path_glob = glob.glob(os.path.join(d, '*.las'))
- # LOG.debug('dir={!r}, glob={!r}'.format(d, las_path_glob))
- # if not las_path_glob:
- # msg = 'Missing .las file. Skipping block for dir {}'.format(d)
- # LOG.error(msg)
- # if len(las_path_glob) > 1:
- # msg = 'Expected exactly 1 .las in {!r}, but found {}:\n {!r}'.format(
- # d, len(las_path_glob), las_path_glob)
- # LOG.warning(mgs)
- # las_paths.extend(las_path_glob)
- las_paths = list()
- for las_fn in sorted(las_fns):
- if not os.path.exists(las_fn):
- msg = 'Did not find las-file {!r}. Waiting {} seconds.'.format(las_fn, WAIT)
- LOG.info(msg)
- time.sleep(WAIT)
- if not os.path.exists(las_fn):
- msg = 'Did not find las-file {!r}, even after waiting {} seconds. Maybe retry later?'.format(las_fn, WAIT)
- raise Exception(msg)
- #LOG.warning(las_fn)
- #continue
- las_paths.append(las_fn)
- # Map block nums to .las files.
- re_block = re.compile(r'\.(\d+)\.las$')
- block2las = dict()
- for fn in las_paths:
- mo = re_block.search(fn)
- if not mo:
- msg = 'Las file {!r} did not match regex {!r}.'.format(
- fn, re_block.pattern)
- raise Exception(msg)
- block = int(mo.group(1))
- block2las[block] = fn
- # Verify sequential block nums.
- blocks = sorted(block2las.keys())
- expected = list(range(1, len(blocks)+1))
- if blocks != expected:
- msg = '{!r} has {} .las files, but their block-numbers are not sequential: {!r}'.format(
- gathered_fn, len(blocks), blocks)
- raise Exception(msg)
- # Serialize result, plus an archaric file.
- io.serialize(las_paths_fn, sorted(block2las.values()))
- io.serialize(block2las_fn, block2las)
- def setup_logging(log_level):
- hdlr = logging.StreamHandler(sys.stderr)
- hdlr.setLevel(log_level)
- hdlr.setFormatter(logging.Formatter('[%(levelname)s]%(message)s'))
- LOG.addHandler(hdlr)
- LOG.setLevel(logging.NOTSET)
- LOG.info('Log-level: {}'.format(log_level))
- def cmd_build(args):
- ours = get_ours(args.config_fn, args.db_fn)
- build_db(ours, args.input_fofn_fn, args.db_fn, args.length_cutoff_fn)
- def cmd_track_combine(args):
- track_combine(args.db_fn, args.track, args.anno_fofn_fn, args.data_fofn_fn)
- def cmd_tan_split(args):
- ours = get_ours(args.config_fn, args.db_fn)
- tan_split(ours, args.config_fn, args.db_fn, args.split_fn, args.bash_template_fn)
- def cmd_tan_apply(args):
- tan_apply(args.db_fn, args.script_fn, args.job_done_fn)
- def cmd_tan_combine(args):
- tan_combine(args.db_fn, args.gathered_fn, args.new_db_fn)
- def cmd_rep_split(args):
- ours = get_ours(args.config_fn, args.db_fn)
- rep_split(
- ours, args.config_fn, args.db_fn,
- args.las_paths_fn, args.wildcards,
- args.group_size, args.coverage_limit,
- args.split_fn, args.bash_template_fn,
- )
- def cmd_rep_apply(args):
- rep_apply(args.db_fn, args.script_fn, args.job_done_fn)
- def cmd_rep_combine(args):
- rep_combine(args.db_fn, args.gathered_fn, args.group_size, args.new_db_fn)
- def cmd_rep_daligner_split(args):
- ours = get_ours(args.config_fn, args.db_fn)
- rep_daligner_split(
- ours, args.config_fn, args.db_fn, args.nproc,
- args.wildcards, args.group_size, args.coverage_limit,
- args.split_fn, args.bash_template_fn,
- )
- def cmd_daligner_split(args):
- ours = get_ours(args.config_fn, args.db_fn)
- daligner_split(
- ours, args.config_fn, args.db_fn, args.nproc,
- args.wildcards, args.length_cutoff_fn,
- args.split_fn, args.bash_template_fn,
- )
- def cmd_daligner_apply(args):
- daligner_apply(args.db_fn, args.script_fn, args.job_done_fn)
- def cmd_daligner_combine(args):
- daligner_combine(args.db_fn, args.gathered_fn, args.las_paths_fn)
- def cmd_merge_split(args):
- merge_split(
- args.config_fn, args.db_prefix, args.las_paths_fn,
- args.wildcards,
- args.split_fn, args.bash_template_fn,
- )
- def cmd_merge_apply(args):
- merge_apply(args.las_paths_fn, args.las_fn)
- def cmd_merge_combine(args):
- merge_combine(args.gathered_fn, args.las_paths_fn, args.block2las_fn)
- options_note = """
- For raw_reads.db, we also look for the following config keys:
- - pa_DBsplit_option
- - pa_HPCdaligner_option
- - pa_HPCTANmask_option
- - pa_daligner_option
- - length_cutoff: -1 => calculate based on "genome_size" and "seed_coverage" config.
- - seed_coverage
- - genome_size
- For preads.db, these are named:
- - ovlp_DBsplit_option
- - ovlp_HPCdaligner_option
- - ovlp_daligner_option
- - length_cutoff_pr
- """
- def get_ours(config_fn, db_fn):
- ours = dict()
- config = io.deserialize(config_fn)
- ours['genome_size'] = int(config['genome_size'])
- ours['seed_coverage'] = float(config['seed_coverage'])
- if os.path.basename(db_fn).startswith('preads'):
- ours['DBdust_opt'] = config.get('ovlp_DBdust_option', '')
- ours['DBsplit_opt'] = config.get('ovlp_DBsplit_option', '')
- ours['daligner_opt'] = config.get('ovlp_daligner_option', '') + ' ' + config.get('ovlp_HPCdaligner_option', '')
- ours['user_length_cutoff'] = int(config.get('length_cutoff_pr', '0'))
- ours['fasta_filter_option'] = 'pass'
- ours['genome_size'] = int(config.get('genome_size', 0))
- ours['subsample_coverage'] = 0
- ours['subsample_random_seed'] = int(config.get('pa_subsample_random_seed', 0))
- ours['subsample_strategy'] = config.get('pa_subsample_strategy', 'random')
- else:
- ours['DBdust_opt'] = config.get('pa_DBdust_option', '')
- ours['DBsplit_opt'] = config.get('pa_DBsplit_option', '')
- ours['daligner_opt'] = config.get('pa_daligner_option', '') + ' ' + config.get('pa_HPCdaligner_option', '')
- ours['TANmask_opt'] = config.get('pa_daligner_option', '') + ' ' + config.get('pa_HPCTANmask_option', '')
- ours['REPmask_opt'] = config.get('pa_daligner_option', '') + ' ' + config.get('pa_HPCREPmask_option', '')
- ours['user_length_cutoff'] = int(config.get('length_cutoff', '0'))
- ours['fasta_filter_option'] = config.get('pa_fasta_filter_option', 'pass')
- ours['genome_size'] = int(config.get('genome_size', 0))
- ours['subsample_coverage'] = int(config.get('pa_subsample_coverage', 0))
- ours['subsample_random_seed'] = int(config.get('pa_subsample_random_seed', 0))
- ours['subsample_strategy'] = config.get('pa_subsample_strategy', 'random')
- LOG.info('config({!r}):\n{}'.format(config_fn, config))
- LOG.info('our subset of config:\n{}'.format(ours))
- return ours
- def add_build_arguments(parser):
- parser.add_argument(
- '--input-fofn-fn', required=True,
- help='input. User-provided file of fasta filename. Relative paths are relative to directory of FOFN.',
- )
- parser.add_argument(
- '--length-cutoff-fn', required=True,
- help='output. Simple file of a single integer, either calculated or specified by --user-length-cutoff.'
- )
- def add_track_combine_arguments(parser):
- parser.add_argument(
- '--anno-fofn-fn', required=True,
- help='input. FOFN for .anno track files, which are 1:1 with .data track files.',
- )
- parser.add_argument(
- '--data-fofn-fn', required=True,
- help='input. FOFN for .data track files, which are 1:1 with .anno track files.',
- )
- parser.add_argument(
- '--track', required=True,
- help='Name of the Dazzler DB track. (e.g. "tan" or "rep1")',
- )
- def add_tan_split_arguments(parser):
- parser.add_argument(
- '--split-fn', default='tan-mask-uows.json',
- help='output. Units-of-work from HPC.TANmask, for datander.',
- )
- parser.add_argument(
- '--bash-template-fn', default='bash-template.sh',
- help='output. Script to apply later.',
- )
- def add_tan_apply_arguments(parser):
- parser.add_argument(
- '--script-fn', required=True,
- help='input. Script to run datander.',
- )
- parser.add_argument(
- '--job-done-fn', default='job.done',
- help='output. Sentinel.',
- )
- def add_tan_combine_arguments(parser):
- parser.add_argument(
- '--gathered-fn', required=True,
- help='input. List of sentinels. Produced by gen_parallel_tasks() gathering. The tan-track files are next to these.',
- )
- parser.add_argument(
- '--new-db-fn', required=True,
- help='output. This must match the input DB name. It will be symlinked, except the new track files.',
- )
- def add_rep_split_arguments(parser):
- parser.add_argument(
- '--wildcards', #default='rep1_id',
- help='Comma-separated string of keys to be subtituted into output paths for each job, if any. (Helps with snakemake and pypeflow; not needed in pbsmrtpipe, since outputs are pre-determined.)',
- )
- parser.add_argument(
- '--group-size', '-g', required=True, type=int,
- help='Number of blocks per group. This should match what was passed to HPC.REPmask. Here, it becomes part of the mask name, repN.',
- )
- parser.add_argument(
- '--coverage-limit', '-c', required=True, type=int,
- help='Coverage threshold for masking.',
- )
- parser.add_argument(
- '--las-paths-fn', required=True,
- help='input. JSON list of las paths. These will have the format of standard daligner/LAmerge (foo.N.las), rather than of REPmask (foo.R1.N.las).',
- # so we will symlink as we use? (TODO: Also delete after use?)
- )
- parser.add_argument(
- '--split-fn', default='rep-mask-uows.json',
- help='output. Units-of-work from earlier HPC.REPmask, for REPmask.',
- )
- parser.add_argument(
- '--bash-template-fn', default='bash-template.sh',
- help='output. Script to apply later.',
- )
- def add_rep_apply_arguments(parser):
- parser.add_argument(
- '--script-fn', required=True,
- help='input. Script to run REPmask.',
- )
- parser.add_argument(
- '--job-done-fn', default='job.done',
- help='output. Sentinel.',
- )
- def add_rep_combine_arguments(parser):
- parser.add_argument(
- '--group-size', '-g', required=True, type=int,
- help='Number of blocks per group. This should match what was passed to HPC.REPmask. Here, it becomes part of the mask name, repN.',
- )
- parser.add_argument(
- '--gathered-fn', required=True,
- help='input. List of sentinels. Produced by gen_parallel_tasks() gathering. The rep-track files are next to these.',
- )
- parser.add_argument(
- '--new-db-fn', required=True,
- help='output. This must match the input DB name. It will be symlinked, except the new track files.',
- )
- def add_rep_daligner_split_arguments(parser):
- parser.add_argument(
- '--wildcards', default='dummy_wildcard',
- help='Comma-separated string of keys to be subtituted into output paths for each job, if any. (Helps with snakemake and pypeflow; not needed in pbsmrtpipe, since outputs are pre-determined.)',
- )
- parser.add_argument(
- '--group-size', '-g', required=True, type=int,
- help='Number of blocks per group. This should match what was passed to HPC.REPmask. Here, it becomes part of the mask name, repN.',
- )
- parser.add_argument(
- '--coverage-limit', '-c', required=True, type=int,
- help='Coverage threshold for masking.',
- )
- parser.add_argument(
- '--split-fn', default='rep-daligner-uows.json',
- help='output. Units-of-work from HPC.REPmask, for daligner.',
- )
- parser.add_argument(
- '--bash-template-fn', default='bash-template.sh',
- help='output. Script to apply later.',
- )
- def add_daligner_split_arguments(parser):
- parser.add_argument(
- '--wildcards', default='dummy_wildcard',
- help='Comma-separated string of keys to be subtituted into output paths for each job, if any. (Helps with snakemake and pypeflow; not needed in pbsmrtpipe, since outputs are pre-determined.)',
- )
- parser.add_argument(
- '--length-cutoff-fn', required=True,
- help='input. Simple file of a single integer, either calculated or specified by --user-length-cutoff.'
- )
- parser.add_argument(
- '--split-fn', default='daligner-mask-uows.json',
- help='output. Units-of-work from HPC.daligner, for daligner.',
- )
- parser.add_argument(
- '--bash-template-fn', default='bash-template.sh',
- help='output. Script to apply later.',
- )
- def add_daligner_apply_arguments(parser):
- parser.add_argument(
- '--script-fn', required=True,
- help='input. Script to run daligner.',
- )
- parser.add_argument(
- '--job-done-fn', default='job.done',
- help='output. Sentinel.',
- )
- def add_daligner_combine_arguments(parser):
- parser.add_argument(
- '--gathered-fn', required=True,
- help='input. List of sentinels. Produced by gen_parallel_tasks() gathering. The .las files are next to these.',
- )
- parser.add_argument(
- '--las-paths-fn', required=True,
- help='output. JSON list of las paths.')
- def add_merge_split_arguments(parser):
- parser.add_argument(
- '--wildcards', default='mer0_id',
- help='Comma-separated string of keys to be subtituted into output paths for each job, if any. (Helps with snakemake and pypeflow; not needed in pbsmrtpipe, since outputs are pre-determined.)',
- )
- parser.add_argument(
- '--db-prefix', required=True,
- help='DB is named "prefix.db", and the prefix is expected to match .las files',
- )
- parser.add_argument(
- '--las-paths-fn',
- help='input. foo.a.foo.b.las files from daligner.',
- )
- parser.add_argument(
- '--split-fn', default='merge-uows.json',
- help='output. Units-of-work for LAmerge.',
- )
- parser.add_argument(
- '--bash-template-fn', default='bash-template.sh',
- help='output. Script to apply later.',
- )
- def add_merge_apply_arguments(parser):
- parser.add_argument(
- '--las-paths-fn', required=True,
- help='input. JSON list of las paths to merge. These must be .las "pairs" (foo.a.foo.b.las). The a-blocks must all be the same. Ultimately, we will generate a single .las from these, named after the a-block.')
- parser.add_argument(
- '--las-fn', required=True,
- help='output. The merged las-file.',
- )
- def add_merge_combine_arguments(parser):
- parser.add_argument(
- '--gathered-fn', required=True,
- help='input. List of sentinels. Produced by gen_parallel_tasks() gathering. The .las files are next to these.',
- )
- parser.add_argument(
- '--las-paths-fn', required=True,
- help='output. JSON list of las paths.')
- parser.add_argument(
- '--block2las-fn', required=True,
- help='output. JSON dict of block (int) to las.')
- class HelpF(argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter):
- pass
- def parse_args(argv):
- description = 'Basic daligner steps: build; split into units-of-work; combine results and prepare for next step.'
- epilog = 'These tasks perform the split/apply/combine strategy (of which map/reduce is a special case).' + options_note
- parser = argparse.ArgumentParser(
- description=description,
- epilog=epilog,
- formatter_class=HelpF,
- )
- parser.add_argument(
- '--log-level', default='INFO',
- help='Python logging level.',
- )
- parser.add_argument(
- '--nproc', type=int, default=0,
- help='ignored for now, but non-zero will mean "No more than this."',
- )
- parser.add_argument(
- '--config-fn', required=True,
- help='Input. JSON of user-configuration. (This is probably the [General] section.)',
- )
- parser.add_argument(
- '--db-fn', default='raw_reads.db',
- help='Input or Output. Dazzler DB. (Dot-files are implicit.)',
- )
- help_build = 'build Dazzler DB for raw_reads; calculate length-cutoff for HGAP seed reads; split Dazzler DB into blocks; run DBdust to mask low-complexity regions'
- help_track_combine = 'given .anno and .data fofns, symlink into CWD, then run Catrack on partial track-files, to produce a mask'
- help_tan_split = 'generate units-of-work for datander, via HPC.TANmask'
- help_tan_apply = 'run datander and TANmask as a unit-of-work (according to output of HPC.TANmask), and remove .las files'
- help_tan_combine = 'run Catrack on partial track-files, to produce a "tan" mask'
- help_rep_split = 'generate units-of-work for REPmask, given earlier HPC.REPmask; daligner and LAmerge have already occurred'
- help_rep_apply = 'run daligner and REPmask as a unit-of-work (according to output of HPC.REPmask), and remove .las files'
- help_rep_combine = 'run Catrack on partial track-files, to produce a "rep" mask'
- help_rep_daligner_split = 'generate units-of-work for daligner, via HPC.REPmask; should be followed by daligner-apply and daligner-combine, then merge-*, then rep-*'
- help_daligner_split = 'generate units-of-work for daligner, via HPC.daligner'
- help_daligner_apply = 'run daligner as a unit-of-work (according to output of HPC.TANmask)'
- help_daligner_combine = 'generate a file of .las files, plus units-of-work for LAmerge (alias for merge-split)'
- help_merge_split = 'generate a file of .las files, plus units-of-work for LAmerge (alias for daligner-combine)'
- help_merge_apply = 'run merge as a unit-of-work, and (possibly) remove .las files'
- help_merge_combine = 'generate a file of .las files'
- subparsers = parser.add_subparsers(help='sub-command help')
- parser_build = subparsers.add_parser('build',
- formatter_class=HelpF,
- description=help_build,
- help=help_build)
- add_build_arguments(parser_build)
- parser_build.set_defaults(func=cmd_build)
- parser_track_combine = subparsers.add_parser('track-combine',
- formatter_class=HelpF,
- description=help_track_combine,
- epilog='To use these as a mask, subsequent steps will need to add "-mTRACK".',
- help=help_track_combine)
- add_track_combine_arguments(parser_track_combine)
- parser_track_combine.set_defaults(func=cmd_track_combine)
- parser_tan_split = subparsers.add_parser('tan-split',
- formatter_class=HelpF,
- description=help_tan_split,
- epilog='',
- help=help_tan_split)
- add_tan_split_arguments(parser_tan_split)
- parser_tan_split.set_defaults(func=cmd_tan_split)
- parser_tan_apply = subparsers.add_parser('tan-apply',
- formatter_class=HelpF,
- description=help_tan_apply,
- epilog='',
- help=help_tan_apply)
- add_tan_apply_arguments(parser_tan_apply)
- parser_tan_apply.set_defaults(func=cmd_tan_apply)
- parser_tan_combine = subparsers.add_parser('tan-combine',
- formatter_class=HelpF,
- description=help_tan_combine,
- epilog='The result will be mostly symlinks, plus new tan-track files. To use these as a mask, subsequent steps will need to add "-mtan".',
- help=help_tan_combine)
- add_tan_combine_arguments(parser_tan_combine)
- parser_tan_combine.set_defaults(func=cmd_tan_combine)
- parser_rep_split = subparsers.add_parser('rep-split',
- formatter_class=HelpF,
- description=help_rep_split,
- epilog='',
- help=help_rep_split)
- add_rep_split_arguments(parser_rep_split)
- parser_rep_split.set_defaults(func=cmd_rep_split)
- parser_rep_apply = subparsers.add_parser('rep-apply',
- formatter_class=HelpF,
- description=help_rep_apply,
- epilog='',
- help=help_rep_apply)
- add_rep_apply_arguments(parser_rep_apply)
- parser_rep_apply.set_defaults(func=cmd_rep_apply)
- parser_rep_combine = subparsers.add_parser('rep-combine',
- formatter_class=HelpF,
- description=help_rep_combine,
- epilog='The result will be mostly symlinks, plus new rep-track files. To use these as a mask, subsequent steps will need to add "-mrep".',
- help=help_rep_combine)
- add_rep_combine_arguments(parser_rep_combine)
- parser_rep_combine.set_defaults(func=cmd_rep_combine)
- parser_rep_daligner_split = subparsers.add_parser('rep-daligner-split',
- formatter_class=HelpF,
- description=help_rep_daligner_split,
- epilog='HPC.REPmask will be passed mask flags for any mask tracks which we glob.',
- help=help_rep_daligner_split)
- add_rep_daligner_split_arguments(parser_rep_daligner_split)
- parser_rep_daligner_split.set_defaults(func=cmd_rep_daligner_split)
- parser_daligner_split = subparsers.add_parser('daligner-split',
- formatter_class=HelpF,
- description=help_daligner_split,
- epilog='HPC.daligner will be passed mask flags for any mask tracks which we glob. Note: if db is named "preads.db", we run daligner_p instead of daligner.',
- help=help_daligner_split)
- add_daligner_split_arguments(parser_daligner_split)
- parser_daligner_split.set_defaults(func=cmd_daligner_split)
- parser_daligner_apply = subparsers.add_parser('daligner-apply',
- formatter_class=HelpF,
- description=help_daligner_apply,
- epilog='',
- help=help_daligner_apply)
- add_daligner_apply_arguments(parser_daligner_apply)
- parser_daligner_apply.set_defaults(func=cmd_daligner_apply)
- #parser_untar_daligner_apply = subparsers.add_parser('untar-daligner-apply',
- # formatter_class=HelpF,
- # description=help_untar_daligner_apply,
- # epilog='',
- # help=help_untar_daligner_apply)
- #add_untar_daligner_apply_arguments(parser_untar_daligner_apply)
- #parser_untar_daligner_apply.set_defaults(func=cmd_untar_daligner_apply)
- parser_daligner_combine = subparsers.add_parser('daligner-combine',
- formatter_class=HelpF,
- description=help_daligner_combine,
- epilog='',
- help=help_daligner_combine)
- add_daligner_combine_arguments(parser_daligner_combine)
- parser_daligner_combine.set_defaults(func=cmd_daligner_combine)
- parser_merge_split = subparsers.add_parser('merge-split',
- formatter_class=HelpF,
- description=help_merge_split,
- epilog='HPC.merge will be passed mask flags for any mask tracks which we glob.',
- help=help_merge_split)
- add_merge_split_arguments(parser_merge_split)
- parser_merge_split.set_defaults(func=cmd_merge_split)
- parser_merge_apply = subparsers.add_parser('merge-apply',
- formatter_class=HelpF,
- description=help_merge_apply,
- epilog='Ultimately, there will be 1 .las per block.',
- help=help_merge_apply)
- add_merge_apply_arguments(parser_merge_apply)
- parser_merge_apply.set_defaults(func=cmd_merge_apply)
- parser_merge_combine = subparsers.add_parser('merge-combine',
- formatter_class=HelpF,
- description=help_merge_combine,
- epilog='',
- help=help_merge_combine)
- add_merge_combine_arguments(parser_merge_combine)
- parser_merge_combine.set_defaults(func=cmd_merge_combine)
- args = parser.parse_args(argv[1:])
- return args
- def main(argv=sys.argv):
- args = parse_args(argv)
- setup_logging(args.log_level)
- LOG.info("====>[{}],\n====>args={}".format(args.func, args))
- args.func(args)
- if __name__ == '__main__': # pragma: no cover
- main()
|