dazzler.py 59 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533
  1. '''
  2. # DAMASKER options
  3. """
  4. Example config usage:
  5. pa_use_tanmask = true
  6. pa_use_repmask = true
  7. pa_HPCtanmask_option =
  8. pa_repmask_levels = 2
  9. pa_HPCrepmask_1_option = -g1 -c20 -mtan
  10. pa_HPCrepmask_2_option = -g10 -c15 -mtan -mrep1
  11. pa_damasker_HPCdaligner_option = -mtan -mrep1 -mrep10
  12. """
  13. pa_use_tanmask = False
  14. if config.has_option(section, 'pa_use_tanmask'):
  15. pa_use_tanmask = config.getboolean(section, 'pa_use_tanmask')
  16. pa_HPCtanmask_option = ""
  17. if config.has_option(section, 'pa_HPCtanmask_option'):
  18. pa_HPCtanmask_option = config.get(section, 'pa_HPCtanmask_option')
  19. pa_use_repmask = False
  20. if config.has_option(section, 'pa_use_repmask'):
  21. pa_use_repmask = config.getboolean(section, 'pa_use_repmask')
  22. pa_repmask_levels = 0 # REPmask tool can be used multiple times.
  23. if config.has_option(section, 'pa_repmask_levels'):
  24. pa_repmask_levels = config.getint(section, 'pa_repmask_levels')
  25. pa_HPCrepmask_1_option = """ -g1 -c20 -mtan"""
  26. if config.has_option(section, 'pa_HPCrepmask_1_option'):
  27. pa_HPCrepmask_1_option = config.get(section, 'pa_HPCrepmask_1_option')
  28. pa_HPCrepmask_2_option = """ -g10 -c15 -mtan -mrep1"""
  29. if config.has_option(section, 'pa_HPCrepmask_2_option'):
  30. pa_HPCrepmask_2_option = config.get(section, 'pa_HPCrepmask_2_option')
  31. pa_damasker_HPCdaligner_option = """ -mtan -mrep1 -mrep10""" # Repeat masks need to be passed to Daligner.
  32. if config.has_option(section, 'pa_damasker_HPCdaligner_option'):
  33. pa_damasker_HPCdaligner_option = config.get(
  34. section, 'pa_damasker_HPCdaligner_option')
  35. # End of DAMASKER options.
  36. # Note: We dump the 'length_cutoff' file for later reference within the preassembly report
  37. # of pbsmrtpipe HGAP.
  38. # However, it is not a true dependency because we still have a workflow that starts
  39. # from 'corrected reads' (preads), in which case build_db is not run.
  40. Catrack -dfv raw_reads.db tan
  41. '''
  42. import argparse
  43. import collections
  44. import glob
  45. import itertools
  46. import logging
  47. import os
  48. import re
  49. import sys
  50. import tarfile
  51. from ..util.io import yield_validated_fns
  52. from .. import io, functional
  53. from .. import(
  54. bash, # for write_sub_script
  55. pype_tasks, # for TASKS
  56. )
  57. LOG = logging.getLogger()
  58. WAIT = 20 # seconds to wait for file to exist
  59. def filter_DBsplit_option(opt):
  60. """Always add -a.
  61. If we want fewer reads, we rely on the fasta_filter.
  62. Also, add -x, as daligner belches on any read < kmer length.
  63. """
  64. flags = opt.split()
  65. if '-a' not in opt:
  66. flags.append('-a')
  67. if '-x' not in opt:
  68. flags.append('-x70') # daligner belches on any read < kmer length
  69. return ' '.join(flags)
  70. def script_build_db(config, input_fofn_fn, db):
  71. """
  72. db (e.g. 'raw_reads.db') will be output into CWD, should not already exist.
  73. 'dust' track will also be generated.
  74. """
  75. params = dict(config)
  76. try:
  77. cat_fasta = functional.choose_cat_fasta(open(input_fofn_fn).read())
  78. except Exception:
  79. LOG.exception('Using "cat" by default.')
  80. cat_fasta = 'cat '
  81. DBdust = 'DBdust {} {}'.format(config.get('DBdust_opt', ''), db)
  82. fasta_filter_option = config.get('fasta_filter_option', 'pass')
  83. subsample_coverage = config.get('subsample_coverage', 0)
  84. subsample_strategy = config.get('subsample_strategy', 'random')
  85. subsample_random_seed = config.get('subsample_random_seed', 0)
  86. genome_size = config.get('genome_size', 0)
  87. zmw_whitelist_option = ''
  88. use_subsampling = 0
  89. if subsample_coverage > 0:
  90. use_subsampling = 1
  91. zmw_whitelist_option = '--zmw-whitelist-fn zmw.whitelist.json'
  92. params.update(locals())
  93. script = """\
  94. echo "PBFALCON_ERRFILE=$PBFALCON_ERRFILE"
  95. set -o pipefail
  96. rm -f {db}.db {db}.dam .{db}.* # in case of re-run
  97. #fc_fasta2fasta < {input_fofn_fn} >| fc.fofn
  98. zmw_whitelist_option=""
  99. use_subsampling={use_subsampling}
  100. if [[ $use_subsampling -eq 1 ]]; then
  101. while read fn; do {cat_fasta} ${{fn}} | python3 -m falcon_kit.mains.zmw_collect; done < {input_fofn_fn} > zmw.all.tsv
  102. cat zmw.all.tsv | python3 -m falcon_kit.mains.zmw_subsample --coverage "{subsample_coverage}" --random-seed "{subsample_random_seed}" --strategy "{subsample_strategy}" --genome-size "{genome_size}" zmw.whitelist.json
  103. zmw_whitelist_option="--zmw-whitelist-fn zmw.whitelist.json"
  104. fi
  105. while read fn; do {cat_fasta} ${{fn}} | python3 -m falcon_kit.mains.fasta_filter ${{zmw_whitelist_option}} {fasta_filter_option} - | fasta2DB -v {db} -i${{fn##*/}}; done < {input_fofn_fn}
  106. #cat fc.fofn | xargs rm -f
  107. {DBdust}
  108. """.format(**params)
  109. return script
  110. def script_length_cutoff(config, db, length_cutoff_fn='length_cutoff'):
  111. params = dict(config)
  112. length_cutoff = config['user_length_cutoff']
  113. if int(length_cutoff) < 0:
  114. bash_cutoff = '$(python3 -m falcon_kit.mains.calc_cutoff --coverage {} {} <(DBstats -b1 {}))'.format(
  115. params['seed_coverage'], params['genome_size'], db)
  116. else:
  117. bash_cutoff = '{}'.format(length_cutoff)
  118. params.update(locals())
  119. return """
  120. CUTOFF={bash_cutoff}
  121. echo -n $CUTOFF >| {length_cutoff_fn}
  122. """.format(**params)
  123. def script_DBsplit(config, db):
  124. params = dict(config)
  125. params.update(locals())
  126. DBsplit_opt = filter_DBsplit_option(config['DBsplit_opt'])
  127. params.update(locals())
  128. return """
  129. DBsplit -f {DBsplit_opt} {db}
  130. #LB=$(cat {db}.db | LD_LIBRARY_PATH= awk '$1 == "blocks" {{print $3}}')
  131. #echo -n $LB >| db_block_count
  132. """.format(**params)
  133. def build_db(config, input_fofn_fn, db_fn, length_cutoff_fn):
  134. LOG.info('Building rdb from {!r}, to write {!r}'.format(
  135. input_fofn_fn, db_fn))
  136. db = os.path.splitext(db_fn)[0]
  137. # First, fix-up FOFN for thisdir.
  138. my_input_fofn_fn = 'my.' + os.path.basename(input_fofn_fn)
  139. with open(my_input_fofn_fn, 'w') as stream:
  140. for fn in yield_validated_fns(input_fofn_fn):
  141. stream.write(fn)
  142. stream.write('\n')
  143. script = ''.join([
  144. script_build_db(config, my_input_fofn_fn, db),
  145. script_DBsplit(config, db),
  146. script_length_cutoff(config, db, length_cutoff_fn),
  147. ])
  148. script_fn = 'build_db.sh'
  149. with open(script_fn, 'w') as ofs:
  150. exe = bash.write_sub_script(ofs, script)
  151. io.syscall('bash -vex {}'.format(script_fn))
  152. def script_HPC_daligner(config, db, length_cutoff_fn, tracks, prefix):
  153. params = dict(config)
  154. masks = ' '.join('-m{}'.format(track) for track in tracks)
  155. params.update(locals())
  156. symlink(length_cutoff_fn, 'CUTOFF')
  157. return """
  158. #LB=$(cat db_block_count)
  159. CUTOFF=$(cat CUTOFF)
  160. rm -f daligner-jobs.*
  161. echo "SMRT_PYTHON_PATH_PREPEND=$SMRT_PYTHON_PATH_PREPEND"
  162. echo "PATH=$PATH"
  163. which HPC.daligner
  164. HPC.daligner -P. {daligner_opt} {masks} -H$CUTOFF -f{prefix} {db}
  165. """.format(**params)
  166. def script_HPC_TANmask(config, db, tracks, prefix):
  167. assert prefix and '/' not in prefix
  168. params = dict(config)
  169. #masks = ' '.join('-m{}'.format(track) for track in tracks)
  170. params.update(locals())
  171. return """
  172. rm -f {prefix}.*
  173. rm -f .{db}.*.tan.anno
  174. rm -f .{db}.*.tan.data
  175. echo "SMRT_PYTHON_PATH_PREPEND=$SMRT_PYTHON_PATH_PREPEND"
  176. echo "PATH=$PATH"
  177. which HPC.TANmask
  178. HPC.TANmask -P. {TANmask_opt} -v -f{prefix} {db}
  179. """.format(**params)
  180. def script_HPC_REPmask(config, db, tracks, prefix, group_size, coverage_limit):
  181. if group_size == 0: # TODO: Make this a no-op.
  182. group_size = 1
  183. coverage_limit = 10**9 # an arbitrary large number
  184. assert prefix and '/' not in prefix
  185. params = dict(config)
  186. masks = ' '.join('-m{}'.format(track) for track in tracks)
  187. params.update(locals())
  188. return """
  189. rm -f {prefix}.*
  190. rm -f .{db}.*.rep*.anno
  191. rm -f .{db}.*.rep*.data
  192. echo "SMRT_PYTHON_PATH_PREPEND=$SMRT_PYTHON_PATH_PREPEND"
  193. echo "PATH=$PATH"
  194. which HPC.REPmask
  195. HPC.REPmask -P. -g{group_size} -c{coverage_limit} {REPmask_opt} {masks} -v -f{prefix} {db}
  196. """.format(**params)
  197. def symlink(actual, symbolic=None, force=True):
  198. """Symlink into cwd, relatively.
  199. symbolic name is basename(actual) if not provided.
  200. If not force, raise when already exists and does not match.
  201. But ignore symlink to self.
  202. """
  203. symbolic = os.path.basename(actual) if not symbolic else symbolic
  204. if os.path.abspath(actual) == os.path.abspath(symbolic):
  205. LOG.warning('Cannot symlink {!r} as {!r}, itself.'.format(actual, symbolic))
  206. return
  207. rel = os.path.relpath(actual)
  208. if force:
  209. LOG.info('ln -sf {} {}'.format(rel, symbolic))
  210. if os.path.lexists(symbolic):
  211. if os.readlink(symbolic) == rel:
  212. return
  213. else:
  214. os.unlink(symbolic)
  215. else:
  216. LOG.info('ln -s {} {}'.format(rel, symbolic))
  217. if os.path.lexists(symbolic):
  218. if os.readlink(symbolic) != rel:
  219. msg = '{!r} already exists as {!r}, not {!r}'.format(
  220. symbolic, os.readlink(symbolic), rel)
  221. raise Exception(msg)
  222. else:
  223. LOG.info('{!r} already points to {!r}'.format(symbolic, rel))
  224. return
  225. os.symlink(rel, symbolic)
  226. def find_db(db):
  227. if db.endswith('.db') or db.endswith('.dam'):
  228. if not os.path.exists(db):
  229. raise Exception('DB "{}" does not exist.'.format(db))
  230. return db
  231. db_fn = db + '.db'
  232. if os.path.exists(db_fn):
  233. return db_fn
  234. db_fn = db + '.dam'
  235. if os.path.exists(db_fn):
  236. return db_fn
  237. raise Exception('Could not find DB "{}"'.format(db))
  238. def symlink_db(db_fn, symlink=symlink):
  239. """Symlink (into cwd) everything that could be related to this Dazzler DB.
  240. Exact matches will probably cause an exception in symlink().
  241. """
  242. if not os.path.exists(db_fn):
  243. db_fn = find_db(db_fn)
  244. db_dirname, db_basename = os.path.split(os.path.normpath(db_fn))
  245. if not db_dirname:
  246. return # must be here already
  247. dbname, suffix = os.path.splitext(db_basename)
  248. # Note: could be .db or .dam
  249. fn = os.path.join(db_dirname, dbname + suffix)
  250. symlink(fn)
  251. re_suffix = re.compile(r'^\.%s(\.idx|\.bps|\.dust\.data|\.dust\.anno|\.tan\.data|\.tan\.anno|\.rep\d+\.data|\.rep\d+\.anno)$'%dbname)
  252. all_basenames = os.listdir(db_dirname)
  253. for basename in sorted(all_basenames):
  254. mo = re_suffix.search(basename)
  255. if not mo:
  256. continue
  257. fn = os.path.join(db_dirname, basename)
  258. if os.path.exists(fn):
  259. symlink(fn)
  260. else:
  261. LOG.warning('Symlink {!r} seems to be broken.'.format(fn))
  262. return dbname
  263. def tan_split(config, config_fn, db_fn, uows_fn, bash_template_fn):
  264. with open(bash_template_fn, 'w') as stream:
  265. stream.write(pype_tasks.TASK_DB_TAN_APPLY_SCRIPT)
  266. # TANmask would put track-files in the DB-directory, not '.',
  267. # so we need to symlink everything first.
  268. symlink_db(db_fn)
  269. db = os.path.splitext(db_fn)[0]
  270. dbname = os.path.basename(db)
  271. tracks = get_tracks(db_fn)
  272. # We assume the actual DB will be symlinked here.
  273. base_db = os.path.basename(db)
  274. script = ''.join([
  275. script_HPC_TANmask(config, base_db, tracks, prefix='tan-jobs'),
  276. ])
  277. script_fn = 'split.sh'
  278. with open(script_fn, 'w') as ofs:
  279. exe = bash.write_sub_script(ofs, script)
  280. io.syscall('bash -vex {}'.format(script_fn))
  281. # We now have files like tan-jobs.01.OVL
  282. # We need to parse that one. (We ignore the others.)
  283. lines = open('tan-jobs.01.OVL').readlines()
  284. re_block = re.compile(r'{}(\.\d+|)'.format(dbname))
  285. def get_blocks(line):
  286. """Return ['.1', '.2', ...]
  287. """
  288. return [mo.group(1) for mo in re_block.finditer(line)]
  289. scripts = list()
  290. for line in lines:
  291. if line.startswith('#'):
  292. continue
  293. if not line.strip():
  294. continue
  295. blocks = get_blocks(line)
  296. assert blocks, 'No blocks found in {!r} from {!r}'.format(line, 'tan-jobs.01.OVL')
  297. las_files = ' '.join('TAN.{db}{block}.las'.format(db=dbname, block=block) for block in blocks)
  298. # We assume the actual DB will be symlinked here.
  299. base_db = os.path.basename(db)
  300. script_lines = [
  301. line,
  302. 'LAcheck {} {}\n'.format(base_db, las_files),
  303. 'TANmask {} {}\n'.format(base_db, las_files),
  304. 'rm -f {}\n'.format(las_files),
  305. ]
  306. if [''] == blocks:
  307. # special case -- If we have only 1 block, then HPC.TANmask fails to use the block-number.
  308. # However, if there are multiple blocks, it is still possible for a single line to have
  309. # only 1 block. So we look for a solitary block that is '', and we symlink the .las to pretend
  310. # that it was named properly in the first place.
  311. script_lines.append('mv .{db}.tan.data .{db}.1.tan.data\n'.format(db=dbname))
  312. script_lines.append('mv .{db}.tan.anno .{db}.1.tan.anno\n'.format(db=dbname))
  313. scripts.append(''.join(script_lines))
  314. jobs = list()
  315. uow_dirs = list()
  316. for i, script in enumerate(scripts):
  317. job_id = 'tan_{:03d}'.format(i)
  318. script_dir = os.path.join('.', 'tan-scripts', job_id)
  319. script_fn = os.path.join(script_dir, 'run_datander.sh')
  320. io.mkdirs(script_dir)
  321. with open(script_fn, 'w') as stream:
  322. stream.write('{}\n'.format(script))
  323. # Record in a job-dict.
  324. job = dict()
  325. job['input'] = dict(
  326. config=config_fn,
  327. db=db_fn,
  328. script=script_fn,
  329. )
  330. job['output'] = dict(
  331. job_done = 'job.done'
  332. )
  333. job['params'] = dict(
  334. )
  335. job['wildcards'] = dict(
  336. tan0_id=job_id,
  337. )
  338. jobs.append(job)
  339. # Write into a uow directory.
  340. uow_dn = 'uow-{:04d}'.format(i)
  341. io.mkdirs(uow_dn)
  342. with io.cd(uow_dn):
  343. script_fn = 'uow.sh'
  344. with open(script_fn, 'w') as stream:
  345. stream.write(script)
  346. # Add symlinks.
  347. symlink_db(os.path.join('..', base_db))
  348. uow_dirs.append(uow_dn)
  349. io.serialize(uows_fn, jobs)
  350. # For Cromwell, we use a tar-file instead.
  351. move_into_tar('all-units-of-work', uow_dirs)
  352. def tan_apply(db_fn, script_fn, job_done_fn):
  353. # datander would put track-files in the DB-directory, not '.',
  354. # so we need to symlink everything first.
  355. db = symlink_db(db_fn)
  356. symlink(script_fn)
  357. io.syscall('bash -vex {}'.format(os.path.basename(script_fn)))
  358. io.touch(job_done_fn)
  359. def track_combine(db_fn, track, anno_fofn_fn, data_fofn_fn):
  360. # track is typically 'tan' or 'repN'.
  361. symlink_db(db_fn)
  362. db = os.path.splitext(db_fn)[0]
  363. dbname = os.path.basename(db)
  364. def symlink_unprefixed(fn):
  365. bn = os.path.basename(fn)
  366. if bn.startswith('dot'):
  367. bn = bn[3:]
  368. return symlink(fn, bn)
  369. # Inputs will be symlinked into CWD, sans our prefix (assumed to be "dot").
  370. # Note: we cannot use "validated" yielders b/c these can be zero-size.
  371. for (i, pair) in enumerate(itertools.zip_longest(
  372. io.yield_abspath_from_fofn(anno_fofn_fn),
  373. io.yield_abspath_from_fofn(data_fofn_fn))):
  374. (anno_fn, data_fn) = pair
  375. symlink_unprefixed(anno_fn)
  376. symlink_unprefixed(data_fn)
  377. cmd = 'Catrack -vdf {} {}'.format(dbname, track)
  378. LOG.info(cmd)
  379. io.syscall(cmd)
  380. def tan_combine(db_fn, gathered_fn, new_db_fn):
  381. new_db = os.path.splitext(new_db_fn)[0]
  382. db = symlink_db(db_fn)
  383. assert db == new_db, 'basename({!r})!={!r}, but old and new DB names must match.'.format(db_fn, new_db_fn)
  384. # Remove old, in case of resume.
  385. io.syscall('rm -f .{db}.*.tan.anno .{db}.*.tan.data'.format(db=db))
  386. gathered = io.deserialize(gathered_fn)
  387. gathered_dn = os.path.dirname(gathered_fn)
  388. # Create symlinks for all track-files.
  389. for job in gathered:
  390. done_fn = job['job_done']
  391. done_dn = os.path.dirname(done_fn)
  392. if not os.path.isabs(done_dn):
  393. LOG.info('Found relative done-file: {!r}'.format(done_fn))
  394. done_dn = os.path.join(gathered_dn, done_dn)
  395. annos = glob.glob('{}/.{}.*.tan.anno'.format(done_dn, db))
  396. datas = glob.glob('{}/.{}.*.tan.data'.format(done_dn, db))
  397. assert len(annos) == len(datas), 'Mismatched globs:\n{!r}\n{!r}'.format(annos, datas)
  398. for fn in annos + datas:
  399. symlink(fn, force=False)
  400. cmd = 'Catrack -vdf {} tan'.format(db)
  401. io.syscall(cmd)
  402. def rep_split(config, config_fn, db_fn, las_paths_fn, wildcards, group_size, coverage_limit, uows_fn, bash_template_fn):
  403. """For foo.db, HPC.REPmask would produce rep-jobs.05.MASK lines like this:
  404. # REPmask jobs (n)
  405. REPmask -v -c30 -nrep1 foo foo.R1.@1-3
  406. REPmask -v -c30 -nrep1 foo foo.R1.@4-6
  407. ...
  408. (That's for level R1.)
  409. We will do one block at-a-time, for simplicity.
  410. """
  411. with open(bash_template_fn, 'w') as stream:
  412. stream.write(pype_tasks.TASK_DB_REP_APPLY_SCRIPT)
  413. db = symlink_db(db_fn)
  414. las_paths = io.deserialize(las_paths_fn)
  415. scripts = list()
  416. for i, las_fn in enumerate(las_paths):
  417. las_files = las_fn # one at-a-time
  418. # We assume the actual DB will be symlinked here.
  419. base_db = os.path.basename(db)
  420. script_lines = [
  421. #'LAcheck {} {}\n'.format(db, las_files),
  422. 'REPmask -v -c{} -nrep{} {} {}\n'.format(
  423. coverage_limit, group_size, base_db, las_files),
  424. 'rm -f {}\n'.format(las_files),
  425. ]
  426. scripts.append(''.join(script_lines))
  427. jobs = list()
  428. for i, script in enumerate(scripts):
  429. job_id = 'rep_{:03d}'.format(i)
  430. script_dir = os.path.join('.', 'rep-scripts', job_id)
  431. script_fn = os.path.join(script_dir, 'run_REPmask.sh')
  432. io.mkdirs(script_dir)
  433. with open(script_fn, 'w') as stream:
  434. stream.write('{}\n'.format(script))
  435. # Record in a job-dict.
  436. job = dict()
  437. job['input'] = dict(
  438. config=config_fn,
  439. db=db_fn,
  440. script=script_fn,
  441. )
  442. job['output'] = dict(
  443. job_done = 'job.done'
  444. )
  445. job['params'] = dict(
  446. )
  447. job['wildcards'] = {wildcards: job_id}
  448. jobs.append(job)
  449. io.serialize(uows_fn, jobs)
  450. def rep_apply(db_fn, script_fn, job_done_fn):
  451. # daligner would put track-files in the DB-directory, not '.',
  452. # so we need to symlink everything first.
  453. db = symlink_db(db_fn)
  454. symlink(script_fn)
  455. io.syscall('bash -vex {}'.format(os.path.basename(script_fn)))
  456. io.touch(job_done_fn)
  457. def rep_combine(db_fn, gathered_fn, group_size, new_db_fn):
  458. new_db = os.path.splitext(new_db_fn)[0]
  459. db = symlink_db(db_fn)
  460. assert db == new_db, 'basename({!r})!={!r}, but old and new DB names must match.'.format(db_fn, new_db_fn)
  461. # Remove old, in case of resume.
  462. io.syscall('rm -f .{db}.*.rep{group_size}.anno .{db}.*.rep{group_size}.data'.format(**locals()))
  463. gathered = io.deserialize(gathered_fn)
  464. gathered_dn = os.path.dirname(gathered_fn)
  465. # Create symlinks for all track-files.
  466. for job in gathered:
  467. done_fn = job['job_done']
  468. done_dn = os.path.dirname(done_fn)
  469. if not os.path.isabs(done_dn):
  470. LOG.info('Found relative done-file: {!r}'.format(done_fn))
  471. done_dn = os.path.join(gathered_dn, done_dn)
  472. annos = glob.glob('{}/.{}.*.rep{}.anno'.format(done_dn, db, group_size))
  473. datas = glob.glob('{}/.{}.*.rep{}.data'.format(done_dn, db, group_size))
  474. assert len(annos) == len(datas), 'Mismatched globs:\n{!r}\n{!r}'.format(annos, datas)
  475. for fn in annos + datas:
  476. symlink(fn, force=False)
  477. cmd = 'Catrack -vdf {} rep{}'.format(db, group_size)
  478. io.syscall(cmd)
  479. '''
  480. daligner -v -k18 -w8 -h480 -e0.8 -P. /localdisk/scratch/cdunn/repo/FALCON-examples/run/greg200k-sv2/0-rawreads/tan-combine/raw_reads /localdisk/scratch/cdunn/repo/FALCON-examples/run/greg200k-sv2/0-rawreads/tan-combine/raw_reads && mv raw_reads.raw_reads.las raw_reads.R1.1.las
  481. '''
  482. def fake_rep_as_daligner_script_moved(script, dbname):
  483. """
  484. Special case:
  485. # Daligner jobs (1)
  486. daligner raw_reads raw_reads && mv raw_reads.raw_reads.las raw_reads.R1.1.las
  487. Well, unlike for daligner_split, here the block-number is there for this degenerate case. Good!
  488. """
  489. """
  490. We have db.Rn.block.las
  491. We want db.block.db.block.las, for now. (We will 'merge' this solo file later.)
  492. """
  493. re_script = re.compile(r'(mv\b.*\S+\s+)(\S+)$') # no trailing newline, for now
  494. mo = re_script.search(script)
  495. if not mo:
  496. msg = 'Only 1 line in foo-jobs.01.OVL, but\n {!r} did not match\n {!r}.'.format(
  497. re_script.pattern, script)
  498. LOG.warning(msg)
  499. return script
  500. else:
  501. new_script = re_script.sub(r'\1{dbname}.1.{dbname}.1.las'.format(dbname=dbname), script, 1)
  502. msg = 'Only 1 line in foo-jobs.01.OVL:\n {!r} matches\n {!r}. Replacing with\n {!r}.'.format(
  503. re_script.pattern, script, new_script)
  504. LOG.warning(msg)
  505. return new_script
  506. def fake_rep_as_daligner_script_unmoved(script, dbname):
  507. """
  508. Typical case:
  509. # Daligner jobs (N)
  510. daligner raw_reads raw_reads && mv raw_reads.3.raw_reads.3.las raw_reads.R1.3.las
  511. Well, unlike for daligner_split, here the block-number is there for this degenerate case. Good!
  512. """
  513. """
  514. We have db.Rn.block.las
  515. We want db.block.db.block.las. (We will merge later.)
  516. """
  517. re_script = re.compile(r'\s*\&\&\s*mv\s+.*$') # no trailing newline, for now
  518. mo = re_script.search(script)
  519. if not mo:
  520. msg = 'Many lines in foo-jobs.01.OVL, but\n {!r} did not match\n {!r}.'.format(
  521. re_script.pattern, script)
  522. LOG.warning(msg)
  523. return script
  524. else:
  525. new_script = re_script.sub('', script, 1)
  526. msg = 'Many lines in foo-jobs.01.OVL:\n {!r} matches\n {!r}. Replacing with\n {!r}.'.format(
  527. re_script.pattern, script, new_script)
  528. LOG.warning(msg)
  529. return new_script
  530. def _get_rep_daligner_split_noop_scripts(db_fn):
  531. """
  532. We need code to generate an empty .las file for each block.
  533. We do all in the same script to reduce the number of qsub calls for each iteration.
  534. (We cannot generate only 1 block because Catrack expects all N.)
  535. """
  536. with open(db_fn) as stream:
  537. nblocks = functional.dazzler_get_nblocks(stream)
  538. LOG.debug('Found {} blocks in DB {!r}'.format(nblocks, db_fn))
  539. db = os.path.splitext(db_fn)[0]
  540. dbname = os.path.basename(db)
  541. lines = []
  542. for i in range(1, nblocks+1):
  543. las_fn = '{db}.{i}.{db}.{i}.las'.format(db=dbname, i=i)
  544. lines.append('python3 -m falcon_kit.mains.las_write_empty {}'.format(las_fn))
  545. script = '\n'.join(lines)
  546. return [script]
  547. def _get_rep_daligner_split_scripts(config, db_fn, group_size, coverage_limit):
  548. db = os.path.splitext(db_fn)[0]
  549. dbname = os.path.basename(db)
  550. tracks = get_tracks(db_fn)
  551. # First, run HPC.REPmask immediately.
  552. script = ''.join([
  553. script_HPC_REPmask(config, db, tracks,
  554. prefix='rep-jobs', group_size=group_size, coverage_limit=coverage_limit),
  555. ])
  556. script_fn = 'split_db.sh'
  557. with open(script_fn, 'w') as ofs:
  558. exe = bash.write_sub_script(ofs, script)
  559. io.syscall('bash -vex {}'.format(script_fn))
  560. # We now have files like rep-jobs.01.OVL
  561. # We need to parse that one. (We ignore the others.)
  562. lines = open('rep-jobs.01.OVL').readlines()
  563. scripts = list()
  564. for line in lines:
  565. if line.startswith('#'):
  566. continue
  567. if not line.strip():
  568. continue
  569. scripts.append(line)
  570. if len(scripts) == 1:
  571. scripts = [fake_rep_as_daligner_script_moved(s, dbname) for s in scripts]
  572. else:
  573. scripts = [fake_rep_as_daligner_script_unmoved(s, dbname) for s in scripts]
  574. for i, script in enumerate(scripts):
  575. LAcheck = 'LAcheck -vS {} *.las'.format(db)
  576. script += '\n' + LAcheck + '\n'
  577. scripts[i] = script
  578. return scripts
  579. def rep_daligner_split(config, config_fn, db_fn, nproc, wildcards, group_size, coverage_limit, uows_fn, bash_template_fn):
  580. """Similar to daligner_split(), but based on HPC.REPmask instead of HPC.daligner.
  581. """
  582. with open(bash_template_fn, 'w') as stream:
  583. stream.write(pype_tasks.TASK_DB_DALIGNER_APPLY_SCRIPT)
  584. symlink_db(db_fn)
  585. if group_size == 0:
  586. scripts = _get_rep_daligner_split_noop_scripts(os.path.basename(db_fn))
  587. else:
  588. scripts = _get_rep_daligner_split_scripts(config, os.path.basename(db_fn), group_size, coverage_limit)
  589. jobs = list()
  590. for i, script in enumerate(scripts):
  591. job_id = 'rep_{:04d}'.format(i)
  592. script_dir = os.path.join('.', 'rep-scripts', job_id)
  593. script_fn = os.path.join(script_dir, 'run_daligner.sh')
  594. io.mkdirs(script_dir)
  595. with open(script_fn, 'w') as stream:
  596. stream.write('{}\n'.format(script))
  597. # Record in a job-dict.
  598. job = dict()
  599. job['input'] = dict(
  600. config=config_fn,
  601. db=db_fn,
  602. script=script_fn,
  603. )
  604. job['output'] = dict(
  605. job_done = 'job.done'
  606. )
  607. job['params'] = dict(
  608. )
  609. job['wildcards'] = {wildcards: job_id}
  610. jobs.append(job)
  611. io.serialize(uows_fn, jobs)
  612. def get_tracks(db_fn):
  613. db_dirname, db_basename = os.path.split(db_fn)
  614. dbname = os.path.splitext(db_basename)[0]
  615. fns = glob.glob('{}/.{}.*.anno'.format(db_dirname, dbname))
  616. re_anno = re.compile(r'\.{}\.([^\.]+)\.anno'.format(dbname))
  617. tracks = [re_anno.search(fn).group(1) for fn in fns]
  618. return tracks
  619. def daligner_split(config, config_fn, db_fn, nproc, wildcards, length_cutoff_fn, split_fn, bash_template_fn):
  620. with open(bash_template_fn, 'w') as stream:
  621. stream.write(pype_tasks.TASK_DB_DALIGNER_APPLY_SCRIPT)
  622. symlink_db(db_fn)
  623. db = os.path.splitext(db_fn)[0]
  624. dbname = os.path.basename(db)
  625. tracks = get_tracks(db_fn)
  626. # We assume the actual DB will be symlinked here.
  627. base_db = os.path.basename(db)
  628. script = ''.join([
  629. script_HPC_daligner(config, base_db, length_cutoff_fn, tracks, prefix='daligner-jobs'),
  630. ])
  631. script_fn = 'split_db.sh'
  632. with open(script_fn, 'w') as ofs:
  633. exe = bash.write_sub_script(ofs, script)
  634. io.syscall('bash -vex {}'.format(script_fn))
  635. # We now have files like daligner-jobs.01.OVL
  636. # We need to parse that one. (We ignore the others.)
  637. lines = open('daligner-jobs.01.OVL').readlines()
  638. preads_aln = True if dbname == 'preads' else False
  639. xformer = functional.get_script_xformer(preads_aln)
  640. LOG.debug('preads_aln={!r} (True => use daligner_p)'.format(preads_aln))
  641. scripts = list()
  642. for line in lines:
  643. if line.startswith('#'):
  644. continue
  645. if not line.strip():
  646. continue
  647. line = xformer(line) # Use daligner_p for preads.
  648. scripts.append(line)
  649. """
  650. Special case:
  651. # Daligner jobs (1)
  652. daligner raw_reads raw_reads && mv raw_reads.raw_reads.las raw_reads.las
  653. In that case, the "block" name is empty. (See functional.py)
  654. We will rename the file. (LAmerge on a single input is a no-op, which is fine.)
  655. """
  656. if len(scripts) == 1:
  657. script = scripts[0]
  658. re_script = re.compile(r'(mv\b.*\S+\s+)(\S+)$') # no trailing newline, for now
  659. mo = re_script.search(script)
  660. if not mo:
  661. msg = 'Only 1 line in daligner-jobs.01.OVL, but\n {!r} did not match\n {!r}.'.format(
  662. re_script.pattern, script)
  663. LOG.warning(msg)
  664. else:
  665. new_script = re_script.sub(r'\1{dbname}.1.{dbname}.1.las'.format(dbname=dbname), script, 1)
  666. msg = 'Only 1 line in daligner-jobs.01.OVL:\n {!r} matches\n {!r}. Replacing with\n {!r}.'.format(
  667. re_script.pattern, script, new_script)
  668. LOG.warning(msg)
  669. scripts = [new_script]
  670. for i, script in enumerate(scripts):
  671. LAcheck = 'LAcheck -vS {} *.las'.format(base_db)
  672. script += '\n' + LAcheck + '\n'
  673. scripts[i] = script
  674. jobs = list()
  675. uow_dirs = list()
  676. for i, script in enumerate(scripts):
  677. job_id = 'j_{:04d}'.format(i)
  678. script_dir = os.path.join('.', 'daligner-scripts', job_id)
  679. # Write for job-dict.
  680. script_fn = os.path.join(script_dir, 'run_daligner.sh')
  681. io.mkdirs(script_dir)
  682. with open(script_fn, 'w') as stream:
  683. stream.write(script)
  684. # Record in a job-dict.
  685. job = dict()
  686. job['input'] = dict(
  687. config=config_fn,
  688. db=db_fn,
  689. script=script_fn,
  690. )
  691. job['output'] = dict(
  692. job_done = 'daligner.done'
  693. )
  694. job['params'] = dict(
  695. )
  696. job['wildcards'] = {wildcards: job_id}
  697. jobs.append(job)
  698. # Write into a uow directory.
  699. uow_dn = 'uow-{:04d}'.format(i)
  700. io.mkdirs(uow_dn)
  701. with io.cd(uow_dn):
  702. script_fn = 'uow.sh'
  703. with open(script_fn, 'w') as stream:
  704. stream.write(script)
  705. # Add symlinks.
  706. symlink_db(os.path.join('..', base_db))
  707. uow_dirs.append(uow_dn)
  708. io.serialize(split_fn, jobs)
  709. # For Cromwell, we use a tar-file instead.
  710. move_into_tar('all-units-of-work', uow_dirs)
  711. def move_into_tar(dn, fns):
  712. # Create directory 'dn'.
  713. # Move files (or dir-trees) into directory 'dn', and tar it.
  714. # By convention, for tar-file "foo.tar", we first move everything into a directory named "foo".
  715. io.mkdirs(dn)
  716. for fn in fns:
  717. cmd = 'mv {} {}'.format(fn, dn)
  718. io.syscall(cmd)
  719. tar_fn = '{}.tar'.format(dn)
  720. #with tarfile.TarFile(tar_fn, 'w', dereference=False, ignore_zeros=True, errorlevel=2) as tf:
  721. # tf.add(dn)
  722. cmd = 'tar cvf {} {}'.format(tar_fn, dn)
  723. io.syscall(cmd)
  724. io.rmdirs(dn)
  725. def daligner_apply(db_fn, script_fn, job_done_fn):
  726. symlink(script_fn)
  727. symlink_db(db_fn)
  728. io.syscall('bash -vex {}'.format(os.path.basename(script_fn)))
  729. io.touch(job_done_fn)
  730. class MissingLas(Exception):
  731. pass
  732. def is_perfect_square(n):
  733. import math
  734. root = round(math.sqrt(n))
  735. return n == root*root
  736. def daligner_combine(db_fn, gathered_fn, las_paths_fn):
  737. """Merge all .las pair-files from gathered daligner runs.
  738. Write simple las_paths_fn and archaic p_id2las_fn.
  739. """
  740. gathered = io.deserialize(gathered_fn)
  741. d = os.path.abspath(os.path.realpath(os.path.dirname(gathered_fn)))
  742. def abspath(fn):
  743. if os.path.isabs(fn):
  744. return fn # I expect this never to happen though.
  745. return os.path.join(d, fn)
  746. job_done_fns = list()
  747. for job_output in gathered:
  748. for fn in job_output.values():
  749. abs_fn = abspath(fn)
  750. job_done_fns.append(abs_fn)
  751. #import pprint
  752. #LOG.info('job_done_fns: {}'.format(pprint.pformat(job_done_fns)))
  753. job_rundirs = sorted(os.path.dirname(fn) for fn in job_done_fns)
  754. import time
  755. def find_las_paths():
  756. las_paths = list()
  757. for uow_dir in job_rundirs:
  758. # We could assert the existence of a job_done file here.
  759. d = os.path.abspath(uow_dir)
  760. las_path_glob = glob.glob(os.path.join(d, '*.las'))
  761. LOG.debug('dir={!r}, glob={!r}'.format(d, las_path_glob))
  762. if not las_path_glob:
  763. LOG.info('No .las found in {!r}. Sleeping {} seconds before retrying.'.format(d, WAIT))
  764. time.sleep(WAIT)
  765. las_path_glob = glob.glob(os.path.join(d, '*.las'))
  766. LOG.debug('dir={!r}, glob={!r}'.format(d, las_path_glob))
  767. if not las_path_glob:
  768. msg = 'No .las files found in daligner dir {!r}.'.format(d)
  769. raise MissingLas(msg)
  770. las_paths.extend(las_path_glob)
  771. n = len(las_paths)
  772. if not is_perfect_square(n):
  773. #msg = '{} is not a perfect square. We must be missing some .las files.'.format(n)
  774. #raise MissingLas(msg)
  775. pass
  776. return las_paths
  777. try:
  778. las_paths = sorted(find_las_paths(), key=lambda fn: os.path.basename(fn))
  779. except MissingLas as exc:
  780. LOG.exception('Not enough .las found from {!r}. Sleeping {} seconds before retrying.'.format(
  781. gathered_fn, WAIT))
  782. time.sleep(WAIT)
  783. las_paths = find_las_paths()
  784. io.serialize(las_paths_fn, las_paths)
  785. def merge_split(config_fn, dbname, las_paths_fn, wildcards, split_fn, bash_template_fn):
  786. with open(bash_template_fn, 'w') as stream:
  787. stream.write(pype_tasks.TASK_DB_LAMERGE_APPLY_SCRIPT)
  788. las_paths = io.deserialize(las_paths_fn)
  789. re_las_pair = re.compile(r'{db}\.(\d+)\.{db}\.(\d+)\.las$'.format(db=dbname))
  790. las_map = collections.defaultdict(list)
  791. for path in las_paths:
  792. mo = re_las_pair.search(path)
  793. if not mo:
  794. msg = '{!r} does not match regex {!r}'.format(
  795. path, re_las_pair.pattern)
  796. raise Exception(msg)
  797. a, b = int(mo.group(1)), int(mo.group(2))
  798. las_map[a].append(path)
  799. jobs = list()
  800. for i, block in enumerate(las_map):
  801. job_id = 'm_{:05d}'.format(i)
  802. # Write the las files for this job.
  803. input_dir = os.path.join('merge-scripts', job_id)
  804. las_paths_fn = os.path.join('.', input_dir, 'las-paths.json')
  805. io.mkdirs(input_dir)
  806. las_paths = las_map[block]
  807. io.serialize(las_paths_fn, las_paths)
  808. # Record in a job-dict.
  809. las_fn = '{}.{}.las'.format(dbname, block)
  810. job = dict()
  811. job['input'] = dict(
  812. config=config_fn,
  813. #db=db_fn,
  814. #script=os.path.abspath(script_fn),
  815. las_paths=las_paths_fn,
  816. )
  817. job['output'] = dict(
  818. #job_done = 'daligner.done'
  819. las_fn=las_fn
  820. )
  821. job['params'] = dict(
  822. )
  823. job['wildcards'] = {wildcards: job_id}
  824. jobs.append(job)
  825. io.serialize(split_fn, jobs)
  826. def ichunked(seq, chunksize):
  827. """Yields items from an iterator in iterable chunks.
  828. https://stackoverflow.com/a/1335572
  829. """
  830. from itertools import chain, islice
  831. try:
  832. it = iter(seq)
  833. while True:
  834. yield chain([next(it)], islice(it, chunksize-1))
  835. except StopIteration:
  836. return
  837. def merge_apply(las_paths_fn, las_fn):
  838. """Merge the las files into one, a few at a time.
  839. This replaces the logic of HPC.daligner.
  840. """
  841. io.rm_force(las_fn)
  842. #all_las_paths = rel_to(io.deserialize(las_paths_fn), os.path.dirname(las_paths_fn))
  843. all_las_paths = io.deserialize(las_paths_fn)
  844. # Create symlinks, so system calls will be shorter.
  845. all_syms = list()
  846. for fn in all_las_paths:
  847. symlink(fn)
  848. all_syms.append(os.path.basename(fn))
  849. curr_paths = sorted(all_syms)
  850. # Merge a few at-a-time.
  851. at_a_time = 250 # max is 252 for LAmerge
  852. level = 1
  853. while len(curr_paths) > 1:
  854. level += 1
  855. next_paths = list()
  856. for i, paths in enumerate(ichunked(curr_paths, at_a_time)):
  857. tmp_las = 'L{}.{}.las'.format(level, i+1)
  858. paths_arg = ' '.join(paths)
  859. cmd = 'LAmerge -v {} {}'.format(tmp_las, paths_arg)
  860. io.syscall(cmd)
  861. next_paths.append(tmp_las)
  862. curr_paths = next_paths
  863. # Save only the one we want.
  864. io.syscall('mv -f {} {}'.format(curr_paths[0], 'keep-this'))
  865. io.syscall('rm -f *.las')
  866. io.syscall('mv -f {} {}'.format('keep-this', las_fn))
  867. def merge_combine(gathered_fn, las_paths_fn, block2las_fn):
  868. gathered = io.deserialize(gathered_fn)
  869. d = os.path.abspath(os.path.realpath(os.path.dirname(gathered_fn)))
  870. def abspath(fn):
  871. if os.path.isabs(fn):
  872. return fn # I expect this never to happen though.
  873. return os.path.join(d, fn)
  874. las_fns = list()
  875. for job_output in gathered:
  876. assert len(job_output) == 1, 'len(job_output) == {} != 1'.format(len(job_output))
  877. for fn in list(job_output.values()):
  878. abs_fn = abspath(fn)
  879. las_fns.append(abs_fn)
  880. #import pprint
  881. #LOG.info('job_done_fns: {}'.format(pprint.pformat(job_done_fns)))
  882. import time
  883. #job_rundirs = sorted(os.path.dirname(fn) for fn in job_done_fns)
  884. #las_paths = list()
  885. #for uow_dir in job_rundirs:
  886. # # We could assert the existence of a job_done file here.
  887. # d = os.path.abspath(uow_dir)
  888. # las_path_glob = glob.glob(os.path.join(d, '*.las'))
  889. # LOG.debug('dir={!r}, glob={!r}'.format(d, las_path_glob))
  890. # if not las_path_glob:
  891. # time.sleep(WAIT)
  892. # las_path_glob = glob.glob(os.path.join(d, '*.las'))
  893. # LOG.debug('dir={!r}, glob={!r}'.format(d, las_path_glob))
  894. # if not las_path_glob:
  895. # msg = 'Missing .las file. Skipping block for dir {}'.format(d)
  896. # LOG.error(msg)
  897. # if len(las_path_glob) > 1:
  898. # msg = 'Expected exactly 1 .las in {!r}, but found {}:\n {!r}'.format(
  899. # d, len(las_path_glob), las_path_glob)
  900. # LOG.warning(mgs)
  901. # las_paths.extend(las_path_glob)
  902. las_paths = list()
  903. for las_fn in sorted(las_fns):
  904. if not os.path.exists(las_fn):
  905. msg = 'Did not find las-file {!r}. Waiting {} seconds.'.format(las_fn, WAIT)
  906. LOG.info(msg)
  907. time.sleep(WAIT)
  908. if not os.path.exists(las_fn):
  909. msg = 'Did not find las-file {!r}, even after waiting {} seconds. Maybe retry later?'.format(las_fn, WAIT)
  910. raise Exception(msg)
  911. #LOG.warning(las_fn)
  912. #continue
  913. las_paths.append(las_fn)
  914. # Map block nums to .las files.
  915. re_block = re.compile(r'\.(\d+)\.las$')
  916. block2las = dict()
  917. for fn in las_paths:
  918. mo = re_block.search(fn)
  919. if not mo:
  920. msg = 'Las file {!r} did not match regex {!r}.'.format(
  921. fn, re_block.pattern)
  922. raise Exception(msg)
  923. block = int(mo.group(1))
  924. block2las[block] = fn
  925. # Verify sequential block nums.
  926. blocks = sorted(block2las.keys())
  927. expected = list(range(1, len(blocks)+1))
  928. if blocks != expected:
  929. msg = '{!r} has {} .las files, but their block-numbers are not sequential: {!r}'.format(
  930. gathered_fn, len(blocks), blocks)
  931. raise Exception(msg)
  932. # Serialize result, plus an archaric file.
  933. io.serialize(las_paths_fn, sorted(block2las.values()))
  934. io.serialize(block2las_fn, block2las)
  935. def setup_logging(log_level):
  936. hdlr = logging.StreamHandler(sys.stderr)
  937. hdlr.setLevel(log_level)
  938. hdlr.setFormatter(logging.Formatter('[%(levelname)s]%(message)s'))
  939. LOG.addHandler(hdlr)
  940. LOG.setLevel(logging.NOTSET)
  941. LOG.info('Log-level: {}'.format(log_level))
  942. def cmd_build(args):
  943. ours = get_ours(args.config_fn, args.db_fn)
  944. build_db(ours, args.input_fofn_fn, args.db_fn, args.length_cutoff_fn)
  945. def cmd_track_combine(args):
  946. track_combine(args.db_fn, args.track, args.anno_fofn_fn, args.data_fofn_fn)
  947. def cmd_tan_split(args):
  948. ours = get_ours(args.config_fn, args.db_fn)
  949. tan_split(ours, args.config_fn, args.db_fn, args.split_fn, args.bash_template_fn)
  950. def cmd_tan_apply(args):
  951. tan_apply(args.db_fn, args.script_fn, args.job_done_fn)
  952. def cmd_tan_combine(args):
  953. tan_combine(args.db_fn, args.gathered_fn, args.new_db_fn)
  954. def cmd_rep_split(args):
  955. ours = get_ours(args.config_fn, args.db_fn)
  956. rep_split(
  957. ours, args.config_fn, args.db_fn,
  958. args.las_paths_fn, args.wildcards,
  959. args.group_size, args.coverage_limit,
  960. args.split_fn, args.bash_template_fn,
  961. )
  962. def cmd_rep_apply(args):
  963. rep_apply(args.db_fn, args.script_fn, args.job_done_fn)
  964. def cmd_rep_combine(args):
  965. rep_combine(args.db_fn, args.gathered_fn, args.group_size, args.new_db_fn)
  966. def cmd_rep_daligner_split(args):
  967. ours = get_ours(args.config_fn, args.db_fn)
  968. rep_daligner_split(
  969. ours, args.config_fn, args.db_fn, args.nproc,
  970. args.wildcards, args.group_size, args.coverage_limit,
  971. args.split_fn, args.bash_template_fn,
  972. )
  973. def cmd_daligner_split(args):
  974. ours = get_ours(args.config_fn, args.db_fn)
  975. daligner_split(
  976. ours, args.config_fn, args.db_fn, args.nproc,
  977. args.wildcards, args.length_cutoff_fn,
  978. args.split_fn, args.bash_template_fn,
  979. )
  980. def cmd_daligner_apply(args):
  981. daligner_apply(args.db_fn, args.script_fn, args.job_done_fn)
  982. def cmd_daligner_combine(args):
  983. daligner_combine(args.db_fn, args.gathered_fn, args.las_paths_fn)
  984. def cmd_merge_split(args):
  985. merge_split(
  986. args.config_fn, args.db_prefix, args.las_paths_fn,
  987. args.wildcards,
  988. args.split_fn, args.bash_template_fn,
  989. )
  990. def cmd_merge_apply(args):
  991. merge_apply(args.las_paths_fn, args.las_fn)
  992. def cmd_merge_combine(args):
  993. merge_combine(args.gathered_fn, args.las_paths_fn, args.block2las_fn)
  994. options_note = """
  995. For raw_reads.db, we also look for the following config keys:
  996. - pa_DBsplit_option
  997. - pa_HPCdaligner_option
  998. - pa_HPCTANmask_option
  999. - pa_daligner_option
  1000. - length_cutoff: -1 => calculate based on "genome_size" and "seed_coverage" config.
  1001. - seed_coverage
  1002. - genome_size
  1003. For preads.db, these are named:
  1004. - ovlp_DBsplit_option
  1005. - ovlp_HPCdaligner_option
  1006. - ovlp_daligner_option
  1007. - length_cutoff_pr
  1008. """
  1009. def get_ours(config_fn, db_fn):
  1010. ours = dict()
  1011. config = io.deserialize(config_fn)
  1012. ours['genome_size'] = int(config['genome_size'])
  1013. ours['seed_coverage'] = float(config['seed_coverage'])
  1014. if os.path.basename(db_fn).startswith('preads'):
  1015. ours['DBdust_opt'] = config.get('ovlp_DBdust_option', '')
  1016. ours['DBsplit_opt'] = config.get('ovlp_DBsplit_option', '')
  1017. ours['daligner_opt'] = config.get('ovlp_daligner_option', '') + ' ' + config.get('ovlp_HPCdaligner_option', '')
  1018. ours['user_length_cutoff'] = int(config.get('length_cutoff_pr', '0'))
  1019. ours['fasta_filter_option'] = 'pass'
  1020. ours['genome_size'] = int(config.get('genome_size', 0))
  1021. ours['subsample_coverage'] = 0
  1022. ours['subsample_random_seed'] = int(config.get('pa_subsample_random_seed', 0))
  1023. ours['subsample_strategy'] = config.get('pa_subsample_strategy', 'random')
  1024. else:
  1025. ours['DBdust_opt'] = config.get('pa_DBdust_option', '')
  1026. ours['DBsplit_opt'] = config.get('pa_DBsplit_option', '')
  1027. ours['daligner_opt'] = config.get('pa_daligner_option', '') + ' ' + config.get('pa_HPCdaligner_option', '')
  1028. ours['TANmask_opt'] = config.get('pa_daligner_option', '') + ' ' + config.get('pa_HPCTANmask_option', '')
  1029. ours['REPmask_opt'] = config.get('pa_daligner_option', '') + ' ' + config.get('pa_HPCREPmask_option', '')
  1030. ours['user_length_cutoff'] = int(config.get('length_cutoff', '0'))
  1031. ours['fasta_filter_option'] = config.get('pa_fasta_filter_option', 'pass')
  1032. ours['genome_size'] = int(config.get('genome_size', 0))
  1033. ours['subsample_coverage'] = int(config.get('pa_subsample_coverage', 0))
  1034. ours['subsample_random_seed'] = int(config.get('pa_subsample_random_seed', 0))
  1035. ours['subsample_strategy'] = config.get('pa_subsample_strategy', 'random')
  1036. LOG.info('config({!r}):\n{}'.format(config_fn, config))
  1037. LOG.info('our subset of config:\n{}'.format(ours))
  1038. return ours
  1039. def add_build_arguments(parser):
  1040. parser.add_argument(
  1041. '--input-fofn-fn', required=True,
  1042. help='input. User-provided file of fasta filename. Relative paths are relative to directory of FOFN.',
  1043. )
  1044. parser.add_argument(
  1045. '--length-cutoff-fn', required=True,
  1046. help='output. Simple file of a single integer, either calculated or specified by --user-length-cutoff.'
  1047. )
  1048. def add_track_combine_arguments(parser):
  1049. parser.add_argument(
  1050. '--anno-fofn-fn', required=True,
  1051. help='input. FOFN for .anno track files, which are 1:1 with .data track files.',
  1052. )
  1053. parser.add_argument(
  1054. '--data-fofn-fn', required=True,
  1055. help='input. FOFN for .data track files, which are 1:1 with .anno track files.',
  1056. )
  1057. parser.add_argument(
  1058. '--track', required=True,
  1059. help='Name of the Dazzler DB track. (e.g. "tan" or "rep1")',
  1060. )
  1061. def add_tan_split_arguments(parser):
  1062. parser.add_argument(
  1063. '--split-fn', default='tan-mask-uows.json',
  1064. help='output. Units-of-work from HPC.TANmask, for datander.',
  1065. )
  1066. parser.add_argument(
  1067. '--bash-template-fn', default='bash-template.sh',
  1068. help='output. Script to apply later.',
  1069. )
  1070. def add_tan_apply_arguments(parser):
  1071. parser.add_argument(
  1072. '--script-fn', required=True,
  1073. help='input. Script to run datander.',
  1074. )
  1075. parser.add_argument(
  1076. '--job-done-fn', default='job.done',
  1077. help='output. Sentinel.',
  1078. )
  1079. def add_tan_combine_arguments(parser):
  1080. parser.add_argument(
  1081. '--gathered-fn', required=True,
  1082. help='input. List of sentinels. Produced by gen_parallel_tasks() gathering. The tan-track files are next to these.',
  1083. )
  1084. parser.add_argument(
  1085. '--new-db-fn', required=True,
  1086. help='output. This must match the input DB name. It will be symlinked, except the new track files.',
  1087. )
  1088. def add_rep_split_arguments(parser):
  1089. parser.add_argument(
  1090. '--wildcards', #default='rep1_id',
  1091. help='Comma-separated string of keys to be subtituted into output paths for each job, if any. (Helps with snakemake and pypeflow; not needed in pbsmrtpipe, since outputs are pre-determined.)',
  1092. )
  1093. parser.add_argument(
  1094. '--group-size', '-g', required=True, type=int,
  1095. help='Number of blocks per group. This should match what was passed to HPC.REPmask. Here, it becomes part of the mask name, repN.',
  1096. )
  1097. parser.add_argument(
  1098. '--coverage-limit', '-c', required=True, type=int,
  1099. help='Coverage threshold for masking.',
  1100. )
  1101. parser.add_argument(
  1102. '--las-paths-fn', required=True,
  1103. help='input. JSON list of las paths. These will have the format of standard daligner/LAmerge (foo.N.las), rather than of REPmask (foo.R1.N.las).',
  1104. # so we will symlink as we use? (TODO: Also delete after use?)
  1105. )
  1106. parser.add_argument(
  1107. '--split-fn', default='rep-mask-uows.json',
  1108. help='output. Units-of-work from earlier HPC.REPmask, for REPmask.',
  1109. )
  1110. parser.add_argument(
  1111. '--bash-template-fn', default='bash-template.sh',
  1112. help='output. Script to apply later.',
  1113. )
  1114. def add_rep_apply_arguments(parser):
  1115. parser.add_argument(
  1116. '--script-fn', required=True,
  1117. help='input. Script to run REPmask.',
  1118. )
  1119. parser.add_argument(
  1120. '--job-done-fn', default='job.done',
  1121. help='output. Sentinel.',
  1122. )
  1123. def add_rep_combine_arguments(parser):
  1124. parser.add_argument(
  1125. '--group-size', '-g', required=True, type=int,
  1126. help='Number of blocks per group. This should match what was passed to HPC.REPmask. Here, it becomes part of the mask name, repN.',
  1127. )
  1128. parser.add_argument(
  1129. '--gathered-fn', required=True,
  1130. help='input. List of sentinels. Produced by gen_parallel_tasks() gathering. The rep-track files are next to these.',
  1131. )
  1132. parser.add_argument(
  1133. '--new-db-fn', required=True,
  1134. help='output. This must match the input DB name. It will be symlinked, except the new track files.',
  1135. )
  1136. def add_rep_daligner_split_arguments(parser):
  1137. parser.add_argument(
  1138. '--wildcards', default='dummy_wildcard',
  1139. help='Comma-separated string of keys to be subtituted into output paths for each job, if any. (Helps with snakemake and pypeflow; not needed in pbsmrtpipe, since outputs are pre-determined.)',
  1140. )
  1141. parser.add_argument(
  1142. '--group-size', '-g', required=True, type=int,
  1143. help='Number of blocks per group. This should match what was passed to HPC.REPmask. Here, it becomes part of the mask name, repN.',
  1144. )
  1145. parser.add_argument(
  1146. '--coverage-limit', '-c', required=True, type=int,
  1147. help='Coverage threshold for masking.',
  1148. )
  1149. parser.add_argument(
  1150. '--split-fn', default='rep-daligner-uows.json',
  1151. help='output. Units-of-work from HPC.REPmask, for daligner.',
  1152. )
  1153. parser.add_argument(
  1154. '--bash-template-fn', default='bash-template.sh',
  1155. help='output. Script to apply later.',
  1156. )
  1157. def add_daligner_split_arguments(parser):
  1158. parser.add_argument(
  1159. '--wildcards', default='dummy_wildcard',
  1160. help='Comma-separated string of keys to be subtituted into output paths for each job, if any. (Helps with snakemake and pypeflow; not needed in pbsmrtpipe, since outputs are pre-determined.)',
  1161. )
  1162. parser.add_argument(
  1163. '--length-cutoff-fn', required=True,
  1164. help='input. Simple file of a single integer, either calculated or specified by --user-length-cutoff.'
  1165. )
  1166. parser.add_argument(
  1167. '--split-fn', default='daligner-mask-uows.json',
  1168. help='output. Units-of-work from HPC.daligner, for daligner.',
  1169. )
  1170. parser.add_argument(
  1171. '--bash-template-fn', default='bash-template.sh',
  1172. help='output. Script to apply later.',
  1173. )
  1174. def add_daligner_apply_arguments(parser):
  1175. parser.add_argument(
  1176. '--script-fn', required=True,
  1177. help='input. Script to run daligner.',
  1178. )
  1179. parser.add_argument(
  1180. '--job-done-fn', default='job.done',
  1181. help='output. Sentinel.',
  1182. )
  1183. def add_daligner_combine_arguments(parser):
  1184. parser.add_argument(
  1185. '--gathered-fn', required=True,
  1186. help='input. List of sentinels. Produced by gen_parallel_tasks() gathering. The .las files are next to these.',
  1187. )
  1188. parser.add_argument(
  1189. '--las-paths-fn', required=True,
  1190. help='output. JSON list of las paths.')
  1191. def add_merge_split_arguments(parser):
  1192. parser.add_argument(
  1193. '--wildcards', default='mer0_id',
  1194. help='Comma-separated string of keys to be subtituted into output paths for each job, if any. (Helps with snakemake and pypeflow; not needed in pbsmrtpipe, since outputs are pre-determined.)',
  1195. )
  1196. parser.add_argument(
  1197. '--db-prefix', required=True,
  1198. help='DB is named "prefix.db", and the prefix is expected to match .las files',
  1199. )
  1200. parser.add_argument(
  1201. '--las-paths-fn',
  1202. help='input. foo.a.foo.b.las files from daligner.',
  1203. )
  1204. parser.add_argument(
  1205. '--split-fn', default='merge-uows.json',
  1206. help='output. Units-of-work for LAmerge.',
  1207. )
  1208. parser.add_argument(
  1209. '--bash-template-fn', default='bash-template.sh',
  1210. help='output. Script to apply later.',
  1211. )
  1212. def add_merge_apply_arguments(parser):
  1213. parser.add_argument(
  1214. '--las-paths-fn', required=True,
  1215. help='input. JSON list of las paths to merge. These must be .las "pairs" (foo.a.foo.b.las). The a-blocks must all be the same. Ultimately, we will generate a single .las from these, named after the a-block.')
  1216. parser.add_argument(
  1217. '--las-fn', required=True,
  1218. help='output. The merged las-file.',
  1219. )
  1220. def add_merge_combine_arguments(parser):
  1221. parser.add_argument(
  1222. '--gathered-fn', required=True,
  1223. help='input. List of sentinels. Produced by gen_parallel_tasks() gathering. The .las files are next to these.',
  1224. )
  1225. parser.add_argument(
  1226. '--las-paths-fn', required=True,
  1227. help='output. JSON list of las paths.')
  1228. parser.add_argument(
  1229. '--block2las-fn', required=True,
  1230. help='output. JSON dict of block (int) to las.')
  1231. class HelpF(argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter):
  1232. pass
  1233. def parse_args(argv):
  1234. description = 'Basic daligner steps: build; split into units-of-work; combine results and prepare for next step.'
  1235. epilog = 'These tasks perform the split/apply/combine strategy (of which map/reduce is a special case).' + options_note
  1236. parser = argparse.ArgumentParser(
  1237. description=description,
  1238. epilog=epilog,
  1239. formatter_class=HelpF,
  1240. )
  1241. parser.add_argument(
  1242. '--log-level', default='INFO',
  1243. help='Python logging level.',
  1244. )
  1245. parser.add_argument(
  1246. '--nproc', type=int, default=0,
  1247. help='ignored for now, but non-zero will mean "No more than this."',
  1248. )
  1249. parser.add_argument(
  1250. '--config-fn', required=True,
  1251. help='Input. JSON of user-configuration. (This is probably the [General] section.)',
  1252. )
  1253. parser.add_argument(
  1254. '--db-fn', default='raw_reads.db',
  1255. help='Input or Output. Dazzler DB. (Dot-files are implicit.)',
  1256. )
  1257. help_build = 'build Dazzler DB for raw_reads; calculate length-cutoff for HGAP seed reads; split Dazzler DB into blocks; run DBdust to mask low-complexity regions'
  1258. help_track_combine = 'given .anno and .data fofns, symlink into CWD, then run Catrack on partial track-files, to produce a mask'
  1259. help_tan_split = 'generate units-of-work for datander, via HPC.TANmask'
  1260. help_tan_apply = 'run datander and TANmask as a unit-of-work (according to output of HPC.TANmask), and remove .las files'
  1261. help_tan_combine = 'run Catrack on partial track-files, to produce a "tan" mask'
  1262. help_rep_split = 'generate units-of-work for REPmask, given earlier HPC.REPmask; daligner and LAmerge have already occurred'
  1263. help_rep_apply = 'run daligner and REPmask as a unit-of-work (according to output of HPC.REPmask), and remove .las files'
  1264. help_rep_combine = 'run Catrack on partial track-files, to produce a "rep" mask'
  1265. help_rep_daligner_split = 'generate units-of-work for daligner, via HPC.REPmask; should be followed by daligner-apply and daligner-combine, then merge-*, then rep-*'
  1266. help_daligner_split = 'generate units-of-work for daligner, via HPC.daligner'
  1267. help_daligner_apply = 'run daligner as a unit-of-work (according to output of HPC.TANmask)'
  1268. help_daligner_combine = 'generate a file of .las files, plus units-of-work for LAmerge (alias for merge-split)'
  1269. help_merge_split = 'generate a file of .las files, plus units-of-work for LAmerge (alias for daligner-combine)'
  1270. help_merge_apply = 'run merge as a unit-of-work, and (possibly) remove .las files'
  1271. help_merge_combine = 'generate a file of .las files'
  1272. subparsers = parser.add_subparsers(help='sub-command help')
  1273. parser_build = subparsers.add_parser('build',
  1274. formatter_class=HelpF,
  1275. description=help_build,
  1276. help=help_build)
  1277. add_build_arguments(parser_build)
  1278. parser_build.set_defaults(func=cmd_build)
  1279. parser_track_combine = subparsers.add_parser('track-combine',
  1280. formatter_class=HelpF,
  1281. description=help_track_combine,
  1282. epilog='To use these as a mask, subsequent steps will need to add "-mTRACK".',
  1283. help=help_track_combine)
  1284. add_track_combine_arguments(parser_track_combine)
  1285. parser_track_combine.set_defaults(func=cmd_track_combine)
  1286. parser_tan_split = subparsers.add_parser('tan-split',
  1287. formatter_class=HelpF,
  1288. description=help_tan_split,
  1289. epilog='',
  1290. help=help_tan_split)
  1291. add_tan_split_arguments(parser_tan_split)
  1292. parser_tan_split.set_defaults(func=cmd_tan_split)
  1293. parser_tan_apply = subparsers.add_parser('tan-apply',
  1294. formatter_class=HelpF,
  1295. description=help_tan_apply,
  1296. epilog='',
  1297. help=help_tan_apply)
  1298. add_tan_apply_arguments(parser_tan_apply)
  1299. parser_tan_apply.set_defaults(func=cmd_tan_apply)
  1300. parser_tan_combine = subparsers.add_parser('tan-combine',
  1301. formatter_class=HelpF,
  1302. description=help_tan_combine,
  1303. epilog='The result will be mostly symlinks, plus new tan-track files. To use these as a mask, subsequent steps will need to add "-mtan".',
  1304. help=help_tan_combine)
  1305. add_tan_combine_arguments(parser_tan_combine)
  1306. parser_tan_combine.set_defaults(func=cmd_tan_combine)
  1307. parser_rep_split = subparsers.add_parser('rep-split',
  1308. formatter_class=HelpF,
  1309. description=help_rep_split,
  1310. epilog='',
  1311. help=help_rep_split)
  1312. add_rep_split_arguments(parser_rep_split)
  1313. parser_rep_split.set_defaults(func=cmd_rep_split)
  1314. parser_rep_apply = subparsers.add_parser('rep-apply',
  1315. formatter_class=HelpF,
  1316. description=help_rep_apply,
  1317. epilog='',
  1318. help=help_rep_apply)
  1319. add_rep_apply_arguments(parser_rep_apply)
  1320. parser_rep_apply.set_defaults(func=cmd_rep_apply)
  1321. parser_rep_combine = subparsers.add_parser('rep-combine',
  1322. formatter_class=HelpF,
  1323. description=help_rep_combine,
  1324. epilog='The result will be mostly symlinks, plus new rep-track files. To use these as a mask, subsequent steps will need to add "-mrep".',
  1325. help=help_rep_combine)
  1326. add_rep_combine_arguments(parser_rep_combine)
  1327. parser_rep_combine.set_defaults(func=cmd_rep_combine)
  1328. parser_rep_daligner_split = subparsers.add_parser('rep-daligner-split',
  1329. formatter_class=HelpF,
  1330. description=help_rep_daligner_split,
  1331. epilog='HPC.REPmask will be passed mask flags for any mask tracks which we glob.',
  1332. help=help_rep_daligner_split)
  1333. add_rep_daligner_split_arguments(parser_rep_daligner_split)
  1334. parser_rep_daligner_split.set_defaults(func=cmd_rep_daligner_split)
  1335. parser_daligner_split = subparsers.add_parser('daligner-split',
  1336. formatter_class=HelpF,
  1337. description=help_daligner_split,
  1338. epilog='HPC.daligner will be passed mask flags for any mask tracks which we glob. Note: if db is named "preads.db", we run daligner_p instead of daligner.',
  1339. help=help_daligner_split)
  1340. add_daligner_split_arguments(parser_daligner_split)
  1341. parser_daligner_split.set_defaults(func=cmd_daligner_split)
  1342. parser_daligner_apply = subparsers.add_parser('daligner-apply',
  1343. formatter_class=HelpF,
  1344. description=help_daligner_apply,
  1345. epilog='',
  1346. help=help_daligner_apply)
  1347. add_daligner_apply_arguments(parser_daligner_apply)
  1348. parser_daligner_apply.set_defaults(func=cmd_daligner_apply)
  1349. #parser_untar_daligner_apply = subparsers.add_parser('untar-daligner-apply',
  1350. # formatter_class=HelpF,
  1351. # description=help_untar_daligner_apply,
  1352. # epilog='',
  1353. # help=help_untar_daligner_apply)
  1354. #add_untar_daligner_apply_arguments(parser_untar_daligner_apply)
  1355. #parser_untar_daligner_apply.set_defaults(func=cmd_untar_daligner_apply)
  1356. parser_daligner_combine = subparsers.add_parser('daligner-combine',
  1357. formatter_class=HelpF,
  1358. description=help_daligner_combine,
  1359. epilog='',
  1360. help=help_daligner_combine)
  1361. add_daligner_combine_arguments(parser_daligner_combine)
  1362. parser_daligner_combine.set_defaults(func=cmd_daligner_combine)
  1363. parser_merge_split = subparsers.add_parser('merge-split',
  1364. formatter_class=HelpF,
  1365. description=help_merge_split,
  1366. epilog='HPC.merge will be passed mask flags for any mask tracks which we glob.',
  1367. help=help_merge_split)
  1368. add_merge_split_arguments(parser_merge_split)
  1369. parser_merge_split.set_defaults(func=cmd_merge_split)
  1370. parser_merge_apply = subparsers.add_parser('merge-apply',
  1371. formatter_class=HelpF,
  1372. description=help_merge_apply,
  1373. epilog='Ultimately, there will be 1 .las per block.',
  1374. help=help_merge_apply)
  1375. add_merge_apply_arguments(parser_merge_apply)
  1376. parser_merge_apply.set_defaults(func=cmd_merge_apply)
  1377. parser_merge_combine = subparsers.add_parser('merge-combine',
  1378. formatter_class=HelpF,
  1379. description=help_merge_combine,
  1380. epilog='',
  1381. help=help_merge_combine)
  1382. add_merge_combine_arguments(parser_merge_combine)
  1383. parser_merge_combine.set_defaults(func=cmd_merge_combine)
  1384. args = parser.parse_args(argv[1:])
  1385. return args
  1386. def main(argv=sys.argv):
  1387. args = parse_args(argv)
  1388. setup_logging(args.log_level)
  1389. LOG.info("====>[{}],\n====>args={}".format(args.func, args))
  1390. args.func(args)
  1391. if __name__ == '__main__': # pragma: no cover
  1392. main()