run1.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713
  1. from ..pype import (wrap_gen_task as gen_task, gen_parallel_tasks, Dist)
  2. from .. import run_support
  3. from .. import bash, pype_tasks, snakemake
  4. from ..util.system import (only_these_symlinks, lfs_setstripe_maybe)
  5. from .. import io
  6. from .. import functional
  7. # pylint: disable=no-name-in-module, import-error, fixme, line-too-long
  8. from pypeflow.simple_pwatcher_bridge import (PypeProcWatcherWorkflow, MyFakePypeThreadTaskBase,
  9. makePypeLocalFile, PypeTask)
  10. import argparse
  11. import glob
  12. import json
  13. import logging
  14. import os
  15. import re
  16. import sys
  17. import time
  18. LOG = logging.getLogger(__name__) # default, for remote tasks
  19. def check_general_config(general_config, input_config_fn):
  20. if ('pa_daligner_option' not in general_config or
  21. 'ovlp_daligner_option' not in general_config):
  22. msg = '''Missing options.
  23. We now require both "pa_daligner_option" (stage 0) and "ovlp_daligner_option" (stage 1),
  24. which are automatically passed along to
  25. HPC.daligner
  26. HPC.TANmask
  27. HPC.REPmask
  28. These can provide additional flags:
  29. pa_HPCdaligner_option
  30. pa_HPCTANmask_option
  31. ovlp_HPCdaligner_option
  32. pa_REPmask_code (-g/-c pairs for 3 iterations, e.g. '1,20;5,15;20,10')
  33. '''
  34. raise Exception(msg)
  35. required = ('input_fofn', 'genome_size')
  36. for name in required:
  37. assert name in general_config, 'Missing "{}" in {}.'.format(name, input_config_fn)
  38. def main1(prog_name, input_config_fn, logger_config_fn=None):
  39. global LOG
  40. LOG = run_support.setup_logger(logger_config_fn)
  41. lfs_setstripe_maybe(path='.', stripe=12)
  42. LOG.info('fc_run started with configuration %s', input_config_fn)
  43. try:
  44. config = run_support.parse_cfg_file(input_config_fn)
  45. import json
  46. dumped = json.dumps(config, indent=2, separators=(',', ': '), sort_keys=True)
  47. LOG.info('cfg=\n{}'.format(dumped))
  48. except Exception:
  49. LOG.exception('Failed to parse config "{}".'.format(input_config_fn))
  50. raise
  51. general_config = config['General']
  52. check_general_config(general_config, input_config_fn)
  53. input_fofn_fn = general_config['input_fofn']
  54. genome_size = int(general_config['genome_size'])
  55. squash = True if 0 < genome_size < 1000000 else False
  56. wf = PypeProcWatcherWorkflow(job_defaults=config['job.defaults'],
  57. squash=squash,
  58. )
  59. general_config['ver'] = '100'
  60. # Store config as JSON, available to many tasks.
  61. config_fn = './config.json' # must not be in a task-dir
  62. io.serialize(config_fn, config)
  63. run(wf, config,
  64. os.path.abspath(config_fn),
  65. input_fofn_fn=input_fofn_fn,
  66. )
  67. def add_bam2dexta_tasks(
  68. wf,
  69. config,
  70. input_fofn_fn, rawread_dir):
  71. # run bam2dexta
  72. bam2dexta_uows_fn = os.path.join(
  73. rawread_dir, 'bam2dexta-split', 'bam2dexta-uows.json')
  74. bam2dexta_bash_template_fn = os.path.join(
  75. rawread_dir, 'bam2dexta-split', 'bash_template.sh')
  76. wf.addTask(gen_task(
  77. script=pype_tasks.TASK_BAM2DEXTA_SPLIT_SCRIPT,
  78. inputs={
  79. 'bam': input_fofn_fn,
  80. },
  81. outputs={
  82. 'split': bam2dexta_uows_fn,
  83. 'bash_template': bam2dexta_bash_template_fn,
  84. },
  85. parameters={
  86. 'wildcards': 'bam2dexta_id',
  87. },
  88. dist=Dist(local=True),
  89. ))
  90. gathered_fn = os.path.join(rawread_dir, 'bam2dexta-gathered', 'gathered-dexta-files.json')
  91. gen_parallel_tasks(
  92. wf,
  93. bam2dexta_uows_fn, gathered_fn,
  94. run_dict=dict(
  95. bash_template_fn=bam2dexta_bash_template_fn,
  96. script='fubar-TODO', #pype_tasks.TASK_DB_TAN_APPLY_SCRIPT, # for snakemake stuff
  97. inputs={
  98. 'units_of_work': '0-rawreads/bam2dexta-chunks/{bam2dexta_id}/some-units-of-work.json',
  99. },
  100. outputs={
  101. 'results': '0-rawreads/bam2dexta-runs/{bam2dexta_id}/some-done-files.json',
  102. },
  103. parameters={},
  104. ),
  105. dist=Dist(NPROC=1, MB=4000, job_dict=config['job.step.da']),
  106. )
  107. input_fofn_fn = os.path.join(rawread_dir, 'bam2dexta-combine', 'input.fofn')
  108. wf.addTask(gen_task(
  109. script=pype_tasks.TASK_BAM2DEXTA_COMBINE_SCRIPT,
  110. inputs={
  111. 'gathered': gathered_fn,
  112. },
  113. outputs={
  114. 'fofn': input_fofn_fn,
  115. },
  116. parameters={},
  117. dist=Dist(local=True),
  118. ))
  119. return input_fofn_fn
  120. def run(wf, config,
  121. config_fn,
  122. input_fofn_fn,
  123. ):
  124. """
  125. Preconditions (for now):
  126. * LOG
  127. * run_run_support.logger
  128. """
  129. parsed_config = io.deserialize(config_fn)
  130. if parsed_config != config:
  131. msg = 'Config from {!r} != passed config'.format(config_fn)
  132. raise Exception(msg)
  133. general_config = config['General']
  134. general_config_fn = os.path.join(os.path.dirname(config_fn), 'General_config.json')
  135. io.serialize(general_config_fn, general_config) # Some tasks use this.
  136. rawread_dir = '0-rawreads'
  137. pread_dir = '1-preads_ovl'
  138. falcon_asm_dir = '2-asm-falcon'
  139. for d in (rawread_dir, pread_dir, falcon_asm_dir):
  140. run_support.make_dirs(d)
  141. # only matter for parallel jobs
  142. job_defaults = config['job.defaults']
  143. #exitOnFailure = bool(job_defaults.get('stop_all_jobs_on_failure', False))
  144. global default_njobs
  145. default_njobs = int(job_defaults.get('njobs', 7))
  146. wf.max_jobs = default_njobs
  147. assert general_config['input_type'] in (
  148. 'raw', 'preads'), 'Invalid input_type=={!r}'.format(general_config['input_type'])
  149. parameters = {}
  150. # ====> 0-rawreads
  151. if general_config['input_type'] == 'raw':
  152. # Most common workflow: Start with rawreads.
  153. if input_fofn_fn.endswith('.xml'):
  154. input_fofn_fn = add_bam2dexta_tasks(wf, config, input_fofn_fn, rawread_dir)
  155. # import sequences into daligner DB
  156. # calculate length_cutoff (if specified as -1)
  157. # split DB
  158. # run DBdust
  159. r_db_dust_fn = os.path.join(rawread_dir, 'build', 'raw_reads.db')
  160. length_cutoff_fn = os.path.join(rawread_dir, 'build', 'length_cutoff')
  161. wf.addTask(gen_task(
  162. script=pype_tasks.TASK_DB_BUILD_SCRIPT,
  163. inputs={
  164. 'config': general_config_fn,
  165. 'input_fofn': input_fofn_fn,
  166. },
  167. outputs={
  168. 'length_cutoff': length_cutoff_fn,
  169. 'db': r_db_dust_fn,
  170. # Also .raw_reads.*, of course. And dust track.
  171. },
  172. parameters=dict(
  173. ),
  174. dist=Dist(NPROC=1, job_dict=config['job.step.dust']),
  175. ))
  176. # run TANmask
  177. tan_uows_fn = os.path.join(
  178. rawread_dir, 'tan-split', 'tan-uows.json')
  179. tan_bash_template_fn = os.path.join(
  180. rawread_dir, 'tan-split', 'bash_template.sh')
  181. wf.addTask(gen_task(
  182. script=pype_tasks.TASK_DB_TAN_SPLIT_SCRIPT,
  183. inputs={
  184. 'config': general_config_fn,
  185. 'db': r_db_dust_fn,
  186. },
  187. outputs={
  188. 'split': tan_uows_fn,
  189. 'bash_template': tan_bash_template_fn,
  190. },
  191. parameters={},
  192. dist=Dist(NPROC=1),
  193. ))
  194. gathered_fn = os.path.join(rawread_dir, 'tan-gathered', 'gathered-done-files.json')
  195. gen_parallel_tasks(
  196. wf,
  197. tan_uows_fn, gathered_fn,
  198. run_dict=dict(
  199. bash_template_fn=tan_bash_template_fn,
  200. script='fubar-TODO', #pype_tasks.TASK_DB_TAN_APPLY_SCRIPT, # for snakemake stuff
  201. inputs={
  202. 'units_of_work': '0-rawreads/tan-chunks/{tan0_id}/some-units-of-work.json',
  203. },
  204. outputs={
  205. #'job_done': '0-rawreads/{dal0_id}/daligner.done',
  206. 'results': '0-rawreads/tan-runs/{tan0_id}/some-done-files.json',
  207. },
  208. parameters={},
  209. ),
  210. dist=Dist(NPROC=4, MB=4000, job_dict=config['job.step.da']),
  211. )
  212. r_db_tan_fn = os.path.join(rawread_dir, 'tan-combine', 'raw_reads.db')
  213. wf.addTask(gen_task(
  214. script=pype_tasks.TASK_DB_TAN_COMBINE_SCRIPT,
  215. inputs={
  216. 'config': general_config_fn,
  217. 'db': r_db_dust_fn,
  218. 'gathered': gathered_fn,
  219. },
  220. outputs={
  221. 'new_db': r_db_tan_fn,
  222. },
  223. parameters={},
  224. dist=Dist(local=True),
  225. ))
  226. #### HPC.REPmask/daligner/LAmerge
  227. codes = functional.parse_REPmask_code(general_config['pa_REPmask_code'])
  228. LOG.info('Parsed pa_REPmask_code (repa,repb,repc): {!r}'.format(codes))
  229. ### REPmask tasks (a, b, c)
  230. letter = 'a'
  231. group_size, coverage_limit = codes[0]
  232. i_db_fn = r_db_tan_fn
  233. o_db_fn = add_rep_tasks(wf, rawread_dir, config, general_config,
  234. general_config_fn, i_db_fn, length_cutoff_fn,
  235. letter, group_size, coverage_limit)
  236. letter = 'b'
  237. group_size, coverage_limit = codes[1]
  238. i_db_fn = o_db_fn
  239. o_db_fn = add_rep_tasks(wf, rawread_dir, config, general_config,
  240. general_config_fn, i_db_fn, length_cutoff_fn,
  241. letter, group_size, coverage_limit)
  242. letter = 'c'
  243. group_size, coverage_limit = codes[2]
  244. i_db_fn = o_db_fn
  245. o_db_fn = add_rep_tasks(wf, rawread_dir, config, general_config,
  246. general_config_fn, i_db_fn, length_cutoff_fn,
  247. letter, group_size, coverage_limit)
  248. r_db_rep_fn = o_db_fn
  249. #### basic daligner/LAmerge
  250. p_id2las_fn = os.path.join(rawread_dir, 'las-merge-combine', 'p_id2las.json')
  251. las_fofn_fn = os.path.join(rawread_dir, 'las-merge-combine', 'las_fofn.json')
  252. add_daligner_and_merge_tasks(
  253. wf,
  254. general_config, config['job.step.da'], config['job.step.la'],
  255. rawread_dir,
  256. general_config_fn, r_db_rep_fn,
  257. length_cutoff_fn,
  258. p_id2las_fn, las_fofn_fn,
  259. daligner_wildcard='dal0_id',
  260. lamerge_wildcard='mer0_id',
  261. daligner_params={},
  262. db_prefix='raw_reads', # TODO: Infer
  263. daligner_split_script=pype_tasks.TASK_DB_DALIGNER_SPLIT_SCRIPT,
  264. )
  265. ####
  266. if general_config['target'] == 'overlapping':
  267. sys.exit(0)
  268. # Produce new FOFN of preads fasta, based on consensus of overlaps.
  269. split_fn = os.path.join(
  270. rawread_dir, 'cns-split', 'split.json')
  271. bash_template_fn = os.path.join(
  272. rawread_dir, 'cns-split', 'consensus-bash-template.sh')
  273. params = dict(parameters)
  274. params['wildcards'] = 'cns0_id,cns0_id2'
  275. wf.addTask(gen_task(
  276. script=pype_tasks.TASK_CONSENSUS_SPLIT_SCRIPT,
  277. inputs={
  278. 'p_id2las': p_id2las_fn,
  279. 'raw_reads_db': r_db_rep_fn,
  280. 'length_cutoff': length_cutoff_fn,
  281. 'config': general_config_fn,
  282. },
  283. outputs={
  284. 'split': split_fn,
  285. 'bash_template': bash_template_fn,
  286. },
  287. parameters=params,
  288. dist=Dist(local=True),
  289. ))
  290. gathered_fn = os.path.join(rawread_dir, 'cns-gather', 'gathered.json')
  291. gen_parallel_tasks(
  292. wf,
  293. split_fn, gathered_fn,
  294. run_dict=dict(
  295. bash_template_fn=bash_template_fn,
  296. script=pype_tasks.TASK_CONSENSUS_TASK_SCRIPT, # for snakemake only
  297. inputs = {
  298. #'las': '0-rawreads/cns-split/{cns0_id}/merged.{cns0_id2}.las',
  299. #'db': r_db_rep_fn,
  300. #'length_cutoff': length_cutoff_fn,
  301. #'config': general_config_fn,
  302. 'units_of_work': '0-rawreads/cns-chunks/{cns0_id}/some-units-of-work.json',
  303. },
  304. outputs = {
  305. #'fasta': '0-rawreads/consensus/{cns0_id}/consensus.{cns0_id2}.fasta',
  306. 'results': '0-rawreads/cns-runs/{cns0_id}/some-done-files.json',
  307. },
  308. parameters={},
  309. ),
  310. dist=Dist(NPROC=6, job_dict=config['job.step.cns']),
  311. )
  312. preads_fofn_fn = os.path.join(rawread_dir, 'preads', 'input_preads.fofn')
  313. wf.addTask(gen_task(
  314. script=pype_tasks.TASK_CONSENSUS_GATHER_SCRIPT,
  315. inputs={
  316. 'gathered': gathered_fn,
  317. 'config': general_config_fn,
  318. 'raw_reads_db': r_db_rep_fn,
  319. },
  320. outputs={
  321. 'preads_fofn': preads_fofn_fn,
  322. },
  323. parameters=parameters, #{},
  324. dist=Dist(local=True),
  325. ))
  326. rdir = os.path.join(rawread_dir, 'report')
  327. pre_assembly_report_fn = os.path.join(rdir, 'pre_assembly_stats.json')
  328. params = dict(parameters)
  329. params['length_cutoff_user'] = general_config['length_cutoff']
  330. params['genome_length'] = general_config['genome_size'] # note different name; historical
  331. wf.addTask(gen_task(
  332. script=pype_tasks.TASK_REPORT_PRE_ASSEMBLY_SCRIPT,
  333. inputs={'length_cutoff': length_cutoff_fn,
  334. 'raw_reads_db': r_db_rep_fn,
  335. 'preads_fofn': preads_fofn_fn,
  336. 'config': general_config_fn,
  337. },
  338. outputs={'pre_assembly_report': pre_assembly_report_fn,
  339. },
  340. parameters=params,
  341. dist=Dist(local=True),
  342. ))
  343. if general_config['target'] == 'pre-assembly':
  344. wf.refreshTargets()
  345. LOG.info('Quitting after stage-0 for General.target=pre-assembly')
  346. return
  347. # build pread database
  348. if general_config['input_type'] == 'preads':
  349. LOG.info('General.input_type=preads, so we skip stage 0-rawreads.')
  350. preads_fofn_fn = general_config['input_fofn']
  351. assert os.path.exists(preads_fofn_fn), '{!r} does not exist.'.format(preads_fofn_fn)
  352. # ====> 1-preads_ovl
  353. # pdb_build_done = os.path.join(pread_dir, 'pdb_build_done')
  354. # run_jobs_fn = os.path.join(pread_dir, 'run_jobs.sh')
  355. preads_db_fn = os.path.join(pread_dir, 'build', 'preads.db')
  356. length_cutoff_pr_fn = os.path.join(pread_dir, 'build', 'length_cutoff')
  357. wf.addTask(gen_task(
  358. script=pype_tasks.TASK_DB_BUILD_SCRIPT,
  359. inputs={
  360. 'config': general_config_fn,
  361. 'input_fofn': preads_fofn_fn,
  362. },
  363. outputs={
  364. 'length_cutoff': length_cutoff_pr_fn,
  365. 'db': preads_db_fn,
  366. # Also .preads.*, of course.
  367. },
  368. parameters=dict(
  369. ),
  370. dist=Dist(NPROC=1, job_dict=config['job.step.dust']),
  371. ))
  372. ####
  373. p_id2las_fn = os.path.join(pread_dir, 'las-merge-combine', 'block2las.json')
  374. las_fofn_fn = os.path.join(pread_dir, 'las-merge-combine', 'las_fofn.json')
  375. add_daligner_and_merge_tasks(
  376. wf,
  377. general_config, config['job.step.pda'], config['job.step.pla'],
  378. pread_dir,
  379. general_config_fn, preads_db_fn, # no tan-mask for preads
  380. length_cutoff_pr_fn,
  381. p_id2las_fn, las_fofn_fn,
  382. daligner_wildcard='dal1_id',
  383. lamerge_wildcard='mer1_id',
  384. daligner_params={},
  385. db_prefix='preads', # TODO: Infer
  386. daligner_split_script=pype_tasks.TASK_DB_DALIGNER_SPLIT_SCRIPT,
  387. )
  388. ####
  389. db2falcon_dir = os.path.join(pread_dir, 'db2falcon')
  390. db2falcon_done_fn = os.path.join(db2falcon_dir, 'db2falcon_done')
  391. preads4falcon_fn = os.path.join(db2falcon_dir, 'preads4falcon.fasta')
  392. wf.addTask(gen_task(
  393. script=pype_tasks.TASK_RUN_DB_TO_FALCON_SCRIPT,
  394. inputs={'p_id2las': p_id2las_fn,
  395. 'preads_db': preads_db_fn,
  396. },
  397. outputs={'job_done': db2falcon_done_fn,
  398. 'preads4falcon': preads4falcon_fn,
  399. },
  400. parameters={},
  401. dist=Dist(NPROC=4, job_dict=config['job.step.asm']),
  402. ))
  403. # ====> 2-asm-falcon
  404. falcon_asm_done_fn = os.path.join(falcon_asm_dir, 'falcon_asm_done')
  405. for key in ('overlap_filtering_setting', 'length_cutoff_pr', 'fc_ovlp_to_graph_option'):
  406. parameters[key] = general_config[key]
  407. wf.addTask(gen_task(
  408. script=pype_tasks.TASK_RUN_FALCON_ASM_SCRIPT,
  409. inputs={'db2falcon_done': db2falcon_done_fn, 'db_file': preads_db_fn,
  410. 'preads4falcon_fasta': preads4falcon_fn,
  411. 'las_fofn': las_fofn_fn,
  412. 'config': general_config_fn,
  413. },
  414. outputs={'falcon_asm_done': falcon_asm_done_fn},
  415. parameters=parameters,
  416. dist=Dist(NPROC=4, job_dict=config['job.step.asm']),
  417. ))
  418. wf.refreshTargets()
  419. with io.cd('0-rawreads'):
  420. # for backwards-compatibility
  421. io.symlink('las-merge-combine', 'las-gather')
  422. #return falcon_asm_done
  423. def add_daligner_and_merge_tasks(
  424. wf,
  425. general_config, daligner_job_config, merge_job_config,
  426. super_dir,
  427. general_config_fn, db_fn,
  428. length_cutoff_fn, # not always needed (refactor later)
  429. p_id2las_fn, las_fofn_fn,
  430. daligner_wildcard, #='dal0_id',
  431. lamerge_wildcard, #='mer0_id',
  432. daligner_params=dict(),
  433. db_prefix='raw_reads',
  434. daligner_split_script=pype_tasks.TASK_DB_DALIGNER_SPLIT_SCRIPT,
  435. ):
  436. """
  437. Results:
  438. p_id2las_fn, las_fofn_fn
  439. """
  440. parameters = dict()
  441. # run daligner
  442. daligner_all_units_fn = os.path.join(
  443. super_dir, 'daligner-split', 'all-units-of-work.json')
  444. daligner_bash_template_fn = os.path.join(
  445. super_dir, 'daligner-split', 'daligner_bash_template.sh')
  446. params = dict(daligner_params)
  447. params['skip_checks'] = int(general_config.get('skip_checks', 0))
  448. params['wildcards'] = daligner_wildcard
  449. wf.addTask(gen_task(
  450. script=daligner_split_script,
  451. inputs={
  452. 'config': general_config_fn,
  453. 'db': db_fn,
  454. 'length_cutoff': length_cutoff_fn,
  455. },
  456. outputs={
  457. 'split': daligner_all_units_fn,
  458. 'bash_template': daligner_bash_template_fn
  459. },
  460. parameters=params,
  461. dist=Dist(local=True, NPROC=4), # really, NPROC=1, but we need to know the max
  462. ))
  463. gathered_fn = os.path.join(super_dir, 'daligner-gathered', 'gathered-done-files.json')
  464. gen_parallel_tasks(
  465. wf,
  466. daligner_all_units_fn, gathered_fn,
  467. run_dict=dict(
  468. bash_template_fn=daligner_bash_template_fn,
  469. script=pype_tasks.TASK_DB_DALIGNER_APPLY_SCRIPT, # for snakemake stuff
  470. inputs={
  471. 'units_of_work': os.path.join(super_dir, 'daligner-chunks/{%s}/some-units-of-work.json'%daligner_wildcard),
  472. },
  473. outputs={
  474. 'results': os.path.join(super_dir, 'daligner-runs/{%s}/some-done-files.json'%daligner_wildcard),
  475. },
  476. parameters={},
  477. ),
  478. dist=Dist(NPROC=4, MB=4000, job_dict=daligner_job_config),
  479. )
  480. gathered_las_fn = os.path.join(super_dir, 'daligner-combine', 'gathered-las.json')
  481. wf.addTask(gen_task(
  482. script=pype_tasks.TASK_DB_DALIGNER_COMBINE_SCRIPT,
  483. inputs={
  484. 'config': general_config_fn,
  485. 'db': db_fn,
  486. 'gathered': gathered_fn,
  487. },
  488. outputs={
  489. 'las_paths': gathered_las_fn,
  490. },
  491. parameters={},
  492. #dist=Dist(NPROC=1, MB=4000, job_dict=daligner_job_config)
  493. dist=Dist(local=True),
  494. ))
  495. # Merge .las files.
  496. las_merge_all_units_fn = os.path.join(super_dir, 'las-merge-split', 'all-units-of-work.json')
  497. bash_template_fn = os.path.join(super_dir, 'las-merge-split', 'las-merge-bash-template.sh')
  498. params = dict(parameters)
  499. params['db_prefix'] = db_prefix
  500. params['wildcards'] = lamerge_wildcard
  501. wf.addTask(gen_task(
  502. script=pype_tasks.TASK_DB_LAMERGE_SPLIT_SCRIPT,
  503. inputs={
  504. 'config': general_config_fn,
  505. 'las_paths': gathered_las_fn,
  506. },
  507. outputs={
  508. 'split': las_merge_all_units_fn,
  509. 'bash_template': bash_template_fn,
  510. },
  511. parameters=params,
  512. dist=Dist(local=True),
  513. ))
  514. gathered_fn = os.path.join(super_dir, 'las-merge-gathered', 'gathered.json')
  515. gen_parallel_tasks(
  516. wf,
  517. las_merge_all_units_fn, gathered_fn,
  518. run_dict=dict(
  519. bash_template_fn=bash_template_fn,
  520. script=pype_tasks.TASK_DB_LAMERGE_APPLY_SCRIPT, # for snakemake
  521. inputs={
  522. 'units_of_work': os.path.join(super_dir, 'las-merge-chunks/{%s}/some-units-of-work.json'%lamerge_wildcard),
  523. },
  524. outputs={
  525. 'results': os.path.join(super_dir, 'las-merge-runs/{%s}/some-las-paths.json'%lamerge_wildcard),
  526. },
  527. parameters={},
  528. ),
  529. dist=Dist(NPROC=1, job_dict=merge_job_config),
  530. )
  531. wf.addTask(gen_task(
  532. script=pype_tasks.TASK_DB_LAMERGE_COMBINE_SCRIPT,
  533. inputs={
  534. 'config': general_config_fn,
  535. 'gathered': gathered_fn,
  536. },
  537. outputs={
  538. 'block2las': p_id2las_fn,
  539. 'las_paths': las_fofn_fn,
  540. },
  541. parameters={},
  542. dist=Dist(local=True),
  543. ))
  544. def add_rep_tasks(
  545. wf,
  546. rawread_dir, config, general_config,
  547. general_config_fn, i_db_fn, length_cutoff_fn,
  548. letter, group_size, coverage_limit,
  549. ):
  550. """
  551. Add daligner/lamerge/REPmask parallel tasks for one iteration of repeat-masking.
  552. TODO: Make the tasks no-ops if the codes are zero (or something like that).
  553. """
  554. name = 'rep{}'.format(letter)
  555. rep_dir = os.path.join(rawread_dir, name)
  556. o_db_rep_fn = os.path.join(rep_dir, 'rep-combine', 'raw_reads.db')
  557. p_id2las_fn = os.path.join(rep_dir, 'las-merge-combine', 'p_id2las.json')
  558. las_fofn_fn = os.path.join(rep_dir, 'las-merge-combine', 'las_fofn.json')
  559. rep_daligner_params = dict(
  560. group_size=group_size, coverage_limit=coverage_limit,
  561. )
  562. add_daligner_and_merge_tasks(
  563. wf,
  564. general_config, config['job.step.da'], config['job.step.la'],
  565. rep_dir,
  566. general_config_fn, i_db_fn,
  567. length_cutoff_fn,
  568. p_id2las_fn, las_fofn_fn,
  569. daligner_wildcard='dal0{}_id'.format(letter),
  570. lamerge_wildcard='mer0{}_id'.format(letter),
  571. daligner_params=rep_daligner_params,
  572. db_prefix='raw_reads', # TODO: Infer
  573. daligner_split_script=pype_tasks.TASK_DB_REP_DALIGNER_SPLIT_SCRIPT,
  574. )
  575. ### REPmask
  576. # rep-split
  577. # We assume that daligner/LAmerge have already run.
  578. # Instead of using the REP.mask calls from rep-jobs.05.MASK,
  579. # we construct our own.
  580. rep_uows_fn = os.path.join(
  581. rep_dir, 'rep-split', 'rep-uows.json')
  582. rep_bash_template_fn = os.path.join(
  583. rep_dir, 'rep-split', 'bash_template.sh')
  584. wf.addTask(gen_task(
  585. script=pype_tasks.TASK_DB_REP_SPLIT_SCRIPT,
  586. inputs={
  587. 'config': general_config_fn,
  588. 'db': i_db_fn,
  589. 'las_paths': las_fofn_fn,
  590. },
  591. outputs={
  592. 'split': rep_uows_fn,
  593. 'bash_template': rep_bash_template_fn,
  594. },
  595. parameters={
  596. 'group_size': group_size,
  597. 'coverage_limit': coverage_limit,
  598. 'wildcards': '{}_id'.format(name),
  599. },
  600. dist=Dist(NPROC=1),
  601. ))
  602. # rep-apply
  603. gathered_fn = os.path.join(rep_dir, 'rep-gathered', 'gathered-done-files.json')
  604. gen_parallel_tasks(
  605. wf,
  606. rep_uows_fn, gathered_fn,
  607. run_dict=dict(
  608. bash_template_fn=rep_bash_template_fn,
  609. script='fubar-TODO', #pype_tasks.TASK_DB_REP_APPLY_SCRIPT, # for snakemake stuff
  610. inputs={
  611. 'units_of_work': '0-rawreads/%(name)s/rep-chunks/{%(name)s_id}/some-units-of-work.json'%locals(),
  612. },
  613. outputs={
  614. 'results': '0-rawreads/%(name)s/rep-runs/{%(name)s_id}/some-done-files.json'%locals(),
  615. },
  616. parameters={},
  617. ),
  618. dist=Dist(NPROC=4, MB=4000, job_dict=config['job.step.da']),
  619. )
  620. # rep-combine
  621. wf.addTask(gen_task(
  622. script=pype_tasks.TASK_DB_REP_COMBINE_SCRIPT,
  623. inputs={
  624. 'config': general_config_fn,
  625. 'db': i_db_fn,
  626. 'gathered': gathered_fn,
  627. },
  628. outputs={
  629. 'new_db': o_db_rep_fn,
  630. },
  631. parameters={
  632. 'group_size': group_size,
  633. },
  634. dist=Dist(local=True),
  635. ))
  636. return o_db_rep_fn
  637. def main(argv=sys.argv):
  638. parser = argparse.ArgumentParser()
  639. parser.add_argument('config',
  640. help='.cfg/.ini/.json')
  641. parser.add_argument('logger',
  642. nargs='?',
  643. help='(Optional)JSON config for standard Python logging module')
  644. args = parser.parse_args(argv[1:])
  645. main1(argv[0], args.config, args.logger)
  646. if __name__ == '__main__':
  647. main()