run1.py 25 KB


  1. from ..pype import (wrap_gen_task as gen_task, gen_parallel_tasks, Dist)
  2. from .. import run_support
  3. from .. import bash, pype_tasks, snakemake
  4. from ..util.system import (only_these_symlinks, lfs_setstripe_maybe)
  5. from .. import io
  6. from .. import functional
  7. # pylint: disable=no-name-in-module, import-error, fixme, line-too-long
  8. from pypeflow.simple_pwatcher_bridge import (PypeProcWatcherWorkflow, MyFakePypeThreadTaskBase,
  9. makePypeLocalFile, PypeTask)
  10. import argparse
  11. import glob
  12. import json
  13. import logging
  14. import os
  15. import re
  16. import sys
  17. import time
  18. LOG = logging.getLogger(__name__) # default, for remote tasks
  19. def check_general_config(general_config, input_config_fn):
  20. if ('pa_daligner_option' not in general_config or
  21. 'ovlp_daligner_option' not in general_config):
  22. msg = '''Missing options.
  23. We now require both "pa_daligner_option" (stage 0) and "ovlp_daligner_option" (stage 1),
  24. which are automatically passed along to
  25. HPC.daligner
  26. HPC.TANmask
  27. HPC.REPmask
  28. These can provide additional flags:
  29. pa_HPCdaligner_option
  30. pa_HPCTANmask_option
  31. ovlp_HPCdaligner_option
  32. pa_REPmask_code (-g/-c pairs for 3 iterations, e.g. '1,20;5,15;20,10')
  33. '''
  34. raise Exception(msg)
  35. required = ('input_fofn', 'genome_size')
  36. for name in required:
  37. assert name in general_config, 'Missing "{}" in {}.'.format(name, input_config_fn)
  38. def main1(prog_name, input_config_fn, logger_config_fn=None):
  39. global LOG
  40. LOG = run_support.setup_logger(logger_config_fn)
  41. lfs_setstripe_maybe(path='.', stripe=12)
  42. LOG.info('fc_run started with configuration %s', input_config_fn)
  43. try:
  44. config = run_support.parse_cfg_file(input_config_fn)
  45. import json
  46. dumped = json.dumps(config, indent=2, separators=(',', ': '), sort_keys=True)
  47. LOG.info('cfg=\n{}'.format(dumped))
  48. except Exception:
  49. LOG.exception('Failed to parse config "{}".'.format(input_config_fn))
  50. raise
  51. general_config = config['General']
  52. check_general_config(general_config, input_config_fn)
  53. input_fofn_fn = general_config['input_fofn']
  54. genome_size = int(general_config['genome_size'])
  55. squash = True if 0 < genome_size < 1000000 else False
  56. wf = PypeProcWatcherWorkflow(job_defaults=config['job.defaults'],
  57. squash=squash,
  58. )
  59. general_config['ver'] = '100'
  60. # Store config as JSON, available to many tasks.
  61. config_fn = './config.json' # must not be in a task-dir
  62. io.serialize(config_fn, config)
  63. run(wf, config,
  64. os.path.abspath(config_fn),
  65. input_fofn_fn=input_fofn_fn,
  66. )
  67. def add_bam2dexta_tasks(
  68. wf,
  69. config,
  70. input_fofn_fn, rawread_dir):
  71. # run bam2dexta
  72. bam2dexta_uows_fn = os.path.join(
  73. rawread_dir, 'bam2dexta-split', 'bam2dexta-uows.json')
  74. bam2dexta_bash_template_fn = os.path.join(
  75. rawread_dir, 'bam2dexta-split', 'bash_template.sh')
  76. wf.addTask(gen_task(
  77. script=pype_tasks.TASK_BAM2DEXTA_SPLIT_SCRIPT,
  78. inputs={
  79. 'bam': input_fofn_fn,
  80. },
  81. outputs={
  82. 'split': bam2dexta_uows_fn,
  83. 'bash_template': bam2dexta_bash_template_fn,
  84. },
  85. parameters={
  86. 'wildcards': 'bam2dexta_id',
  87. },
  88. dist=Dist(local=True),
  89. ))
  90. gathered_fn = os.path.join(rawread_dir, 'bam2dexta-gathered', 'gathered-dexta-files.json')
  91. gen_parallel_tasks(
  92. wf,
  93. bam2dexta_uows_fn, gathered_fn,
  94. run_dict=dict(
  95. bash_template_fn=bam2dexta_bash_template_fn,
  96. script='fubar-TODO', #pype_tasks.TASK_DB_TAN_APPLY_SCRIPT, # for snakemake stuff
  97. inputs={
  98. 'units_of_work': '0-rawreads/bam2dexta-chunks/{bam2dexta_id}/some-units-of-work.json',
  99. },
  100. outputs={
  101. 'results': '0-rawreads/bam2dexta-runs/{bam2dexta_id}/some-done-files.json',
  102. },
  103. parameters={},
  104. ),
  105. dist=Dist(NPROC=1, MB=4000, job_dict=config['job.step.da']),
  106. )
  107. input_fofn_fn = os.path.join(rawread_dir, 'bam2dexta-combine', 'input.fofn')
  108. wf.addTask(gen_task(
  109. script=pype_tasks.TASK_BAM2DEXTA_COMBINE_SCRIPT,
  110. inputs={
  111. 'gathered': gathered_fn,
  112. },
  113. outputs={
  114. 'fofn': input_fofn_fn,
  115. },
  116. parameters={},
  117. dist=Dist(local=True),
  118. ))
  119. return input_fofn_fn
  120. def run(wf, config,
  121. config_fn,
  122. input_fofn_fn,
  123. ):
  124. """
  125. Preconditions (for now):
  126. * LOG
  127. * run_run_support.logger
  128. """
  129. parsed_config = io.deserialize(config_fn)
  130. if parsed_config != config:
  131. msg = 'Config from {!r} != passed config'.format(config_fn)
  132. raise Exception(msg)
  133. general_config = config['General']
  134. general_config_fn = os.path.join(os.path.dirname(config_fn), 'General_config.json')
  135. io.serialize(general_config_fn, general_config) # Some tasks use this.
  136. rawread_dir = '0-rawreads'
  137. pread_dir = '1-preads_ovl'
  138. falcon_asm_dir = '2-asm-falcon'
  139. for d in (rawread_dir, pread_dir, falcon_asm_dir):
  140. run_support.make_dirs(d)
  141. # only matter for parallel jobs
  142. job_defaults = config['job.defaults']
  143. #exitOnFailure = bool(job_defaults.get('stop_all_jobs_on_failure', False))
  144. global default_njobs
  145. default_njobs = int(job_defaults.get('njobs', 7))
  146. wf.max_jobs = default_njobs
  147. assert general_config['input_type'] in (
  148. 'raw', 'preads'), 'Invalid input_type=={!r}'.format(general_config['input_type'])
  149. parameters = {}
  150. if general_config['input_type'] == 'raw':
  151. # Most common workflow: Start with rawreads.
  152. if input_fofn_fn.endswith('.xml'):
  153. input_fofn_fn = add_bam2dexta_tasks(wf, config, input_fofn_fn, rawread_dir)
  154. # import sequences into daligner DB
  155. # calculate length_cutoff (if specified as -1)
  156. # split DB
  157. # run DBdust
  158. r_db_dust_fn = os.path.join(rawread_dir, 'build', 'raw_reads.db')
  159. length_cutoff_fn = os.path.join(rawread_dir, 'build', 'length_cutoff')
  160. wf.addTask(gen_task(
  161. script=pype_tasks.TASK_DB_BUILD_SCRIPT,
  162. inputs={
  163. 'config': general_config_fn,
  164. 'input_fofn': input_fofn_fn,
  165. },
  166. outputs={
  167. 'length_cutoff': length_cutoff_fn,
  168. 'db': r_db_dust_fn,
  169. # Also .raw_reads.*, of course. And dust track.
  170. },
  171. parameters=dict(
  172. ),
  173. dist=Dist(NPROC=1, job_dict=config['job.step.dust']),
  174. ))
  175. # run TANmask
  176. tan_uows_fn = os.path.join(
  177. rawread_dir, 'tan-split', 'tan-uows.json')
  178. tan_bash_template_fn = os.path.join(
  179. rawread_dir, 'tan-split', 'bash_template.sh')
  180. wf.addTask(gen_task(
  181. script=pype_tasks.TASK_DB_TAN_SPLIT_SCRIPT,
  182. inputs={
  183. 'config': general_config_fn,
  184. 'db': r_db_dust_fn,
  185. },
  186. outputs={
  187. 'split': tan_uows_fn,
  188. 'bash_template': tan_bash_template_fn,
  189. },
  190. parameters={},
  191. dist=Dist(NPROC=1),
  192. ))
  193. gathered_fn = os.path.join(rawread_dir, 'tan-gathered', 'gathered-done-files.json')
  194. gen_parallel_tasks(
  195. wf,
  196. tan_uows_fn, gathered_fn,
  197. run_dict=dict(
  198. bash_template_fn=tan_bash_template_fn,
  199. script='fubar-TODO', #pype_tasks.TASK_DB_TAN_APPLY_SCRIPT, # for snakemake stuff
  200. inputs={
  201. 'units_of_work': '0-rawreads/tan-chunks/{tan0_id}/some-units-of-work.json',
  202. },
  203. outputs={
  204. #'job_done': '0-rawreads/{dal0_id}/daligner.done',
  205. 'results': '0-rawreads/tan-runs/{tan0_id}/some-done-files.json',
  206. },
  207. parameters={},
  208. ),
  209. dist=Dist(NPROC=4, MB=4000, job_dict=config['job.step.da']),
  210. )
  211. r_db_tan_fn = os.path.join(rawread_dir, 'tan-combine', 'raw_reads.db')
  212. wf.addTask(gen_task(
  213. script=pype_tasks.TASK_DB_TAN_COMBINE_SCRIPT,
  214. inputs={
  215. 'config': general_config_fn,
  216. 'db': r_db_dust_fn,
  217. 'gathered': gathered_fn,
  218. },
  219. outputs={
  220. 'new_db': r_db_tan_fn,
  221. },
  222. parameters={},
  223. dist=Dist(local=True),
  224. ))
  225. #### HPC.REPmask/daligner/LAmerge
  226. codes = functional.parse_REPmask_code(general_config['pa_REPmask_code'])
  227. LOG.info('Parsed pa_REPmask_code (repa,repb,repc): {!r}'.format(codes))
  228. ### REPmask tasks (a, b, c)
  229. letter = 'a'
  230. group_size, coverage_limit = codes[0]
  231. i_db_fn = r_db_tan_fn
  232. o_db_fn = add_rep_tasks(wf, rawread_dir, config, general_config,
  233. general_config_fn, i_db_fn, length_cutoff_fn,
  234. letter, group_size, coverage_limit)
  235. letter = 'b'
  236. group_size, coverage_limit = codes[1]
  237. i_db_fn = o_db_fn
  238. o_db_fn = add_rep_tasks(wf, rawread_dir, config, general_config,
  239. general_config_fn, i_db_fn, length_cutoff_fn,
  240. letter, group_size, coverage_limit)
  241. letter = 'c'
  242. group_size, coverage_limit = codes[2]
  243. i_db_fn = o_db_fn
  244. o_db_fn = add_rep_tasks(wf, rawread_dir, config, general_config,
  245. general_config_fn, i_db_fn, length_cutoff_fn,
  246. letter, group_size, coverage_limit)
  247. r_db_rep_fn = o_db_fn
  248. #### basic daligner/LAmerge
  249. p_id2las_fn = os.path.join(rawread_dir, 'las-merge-combine', 'p_id2las.json')
  250. las_fofn_fn = os.path.join(rawread_dir, 'las-merge-combine', 'las_fofn.json')
  251. add_daligner_and_merge_tasks(
  252. wf,
  253. general_config, config['job.step.da'], config['job.step.la'],
  254. rawread_dir,
  255. general_config_fn, r_db_rep_fn,
  256. length_cutoff_fn,
  257. p_id2las_fn, las_fofn_fn,
  258. daligner_wildcard='dal0_id',
  259. lamerge_wildcard='mer0_id',
  260. daligner_params={},
  261. db_prefix='raw_reads', # TODO: Infer
  262. daligner_split_script=pype_tasks.TASK_DB_DALIGNER_SPLIT_SCRIPT,
  263. )
  264. ####
  265. if general_config['target'] == 'overlapping':
  266. sys.exit(0)
  267. # Produce new FOFN of preads fasta, based on consensus of overlaps.
  268. split_fn = os.path.join(
  269. rawread_dir, 'cns-split', 'split.json')
  270. bash_template_fn = os.path.join(
  271. rawread_dir, 'cns-split', 'consensus-bash-template.sh')
  272. params = dict(parameters)
  273. params['wildcards'] = 'cns0_id,cns0_id2'
  274. wf.addTask(gen_task(
  275. script=pype_tasks.TASK_CONSENSUS_SPLIT_SCRIPT,
  276. inputs={
  277. 'p_id2las': p_id2las_fn,
  278. 'raw_reads_db': r_db_rep_fn,
  279. 'length_cutoff': length_cutoff_fn,
  280. 'config': general_config_fn,
  281. },
  282. outputs={
  283. 'split': split_fn,
  284. 'bash_template': bash_template_fn,
  285. },
  286. parameters=params,
  287. dist=Dist(local=True),
  288. ))
  289. gathered_fn = os.path.join(rawread_dir, 'cns-gather', 'gathered.json')
  290. gen_parallel_tasks(
  291. wf,
  292. split_fn, gathered_fn,
  293. run_dict=dict(
  294. bash_template_fn=bash_template_fn,
  295. script=pype_tasks.TASK_CONSENSUS_TASK_SCRIPT, # for snakemake only
  296. inputs = {
  297. #'las': '0-rawreads/cns-split/{cns0_id}/merged.{cns0_id2}.las',
  298. #'db': r_db_rep_fn,
  299. #'length_cutoff': length_cutoff_fn,
  300. #'config': general_config_fn,
  301. 'units_of_work': '0-rawreads/cns-chunks/{cns0_id}/some-units-of-work.json',
  302. },
  303. outputs = {
  304. #'fasta': '0-rawreads/consensus/{cns0_id}/consensus.{cns0_id2}.fasta',
  305. 'results': '0-rawreads/cns-runs/{cns0_id}/some-done-files.json',
  306. },
  307. parameters={},
  308. ),
  309. dist=Dist(NPROC=6, job_dict=config['job.step.cns']),
  310. )
  311. preads_fofn_fn = os.path.join(rawread_dir, 'preads', 'input_preads.fofn')
  312. wf.addTask(gen_task(
  313. script=pype_tasks.TASK_CONSENSUS_GATHER_SCRIPT,
  314. inputs={
  315. 'gathered': gathered_fn,
  316. 'config': general_config_fn,
  317. 'raw_reads_db': r_db_rep_fn,
  318. },
  319. outputs={
  320. 'preads_fofn': preads_fofn_fn,
  321. },
  322. parameters=parameters, #{},
  323. dist=Dist(local=True),
  324. ))
  325. rdir = os.path.join(rawread_dir, 'report')
  326. pre_assembly_report_fn = os.path.join(rdir, 'pre_assembly_stats.json')
  327. params = dict(parameters)
  328. params['length_cutoff_user'] = general_config['length_cutoff']
  329. params['genome_length'] = general_config['genome_size'] # note different name; historical
  330. wf.addTask(gen_task(
  331. script=pype_tasks.TASK_REPORT_PRE_ASSEMBLY_SCRIPT,
  332. inputs={'length_cutoff': length_cutoff_fn,
  333. 'raw_reads_db': r_db_rep_fn,
  334. 'preads_fofn': preads_fofn_fn,
  335. 'config': general_config_fn,
  336. },
  337. outputs={'pre_assembly_report': pre_assembly_report_fn,
  338. },
  339. parameters=params,
  340. dist=Dist(local=True),
  341. ))
  342. if general_config['target'] == 'pre-assembly':
  343. wf.refreshTargets()
  344. LOG.info('Quitting after stage-0 for General.target=pre-assembly')
  345. return
  346. # build pread database
  347. if general_config['input_type'] == 'preads':
  348. LOG.info('General.input_type=preads, so we skip stage 0-rawreads.')
  349. preads_fofn_fn = general_config['input_fofn']
  350. assert os.path.exists(preads_fofn_fn), '{!r} does not exist.'.format(preads_fofn_fn)
  351. pdb_build_done = os.path.join(pread_dir, 'pdb_build_done')
  352. run_jobs_fn = os.path.join(pread_dir, 'run_jobs.sh')
  353. preads_db_fn = os.path.join(pread_dir, 'build', 'preads.db')
  354. length_cutoff_pr_fn = os.path.join(pread_dir, 'build', 'length_cutoff')
  355. wf.addTask(gen_task(
  356. script=pype_tasks.TASK_DB_BUILD_SCRIPT,
  357. inputs={
  358. 'config': general_config_fn,
  359. 'input_fofn': preads_fofn_fn,
  360. },
  361. outputs={
  362. 'length_cutoff': length_cutoff_pr_fn,
  363. 'db': preads_db_fn,
  364. # Also .preads.*, of course.
  365. },
  366. parameters=dict(
  367. ),
  368. dist=Dist(NPROC=1, job_dict=config['job.step.dust']),
  369. ))
  370. ####
  371. p_id2las_fn = os.path.join(pread_dir, 'las-merge-combine', 'block2las.json')
  372. las_fofn_fn = os.path.join(pread_dir, 'las-merge-combine', 'las_fofn.json')
  373. add_daligner_and_merge_tasks(
  374. wf,
  375. general_config, config['job.step.pda'], config['job.step.pla'],
  376. pread_dir,
  377. general_config_fn, preads_db_fn, # no tan-mask for preads
  378. length_cutoff_pr_fn,
  379. p_id2las_fn, las_fofn_fn,
  380. daligner_wildcard='dal1_id',
  381. lamerge_wildcard='mer1_id',
  382. daligner_params={},
  383. db_prefix='preads', # TODO: Infer
  384. daligner_split_script=pype_tasks.TASK_DB_DALIGNER_SPLIT_SCRIPT,
  385. )
  386. ####
  387. db2falcon_dir = os.path.join(pread_dir, 'db2falcon')
  388. db2falcon_done_fn = os.path.join(db2falcon_dir, 'db2falcon_done')
  389. preads4falcon_fn = os.path.join(db2falcon_dir, 'preads4falcon.fasta')
  390. wf.addTask(gen_task(
  391. script=pype_tasks.TASK_RUN_DB_TO_FALCON_SCRIPT,
  392. inputs={'p_id2las': p_id2las_fn,
  393. 'preads_db': preads_db_fn,
  394. },
  395. outputs={'job_done': db2falcon_done_fn,
  396. 'preads4falcon': preads4falcon_fn,
  397. },
  398. parameters={},
  399. dist=Dist(NPROC=4, job_dict=config['job.step.asm']),
  400. ))
  401. falcon_asm_done_fn = os.path.join(falcon_asm_dir, 'falcon_asm_done')
  402. for key in ('overlap_filtering_setting', 'length_cutoff_pr', 'fc_ovlp_to_graph_option'):
  403. parameters[key] = general_config[key]
  404. wf.addTask(gen_task(
  405. script=pype_tasks.TASK_RUN_FALCON_ASM_SCRIPT,
  406. inputs={'db2falcon_done': db2falcon_done_fn, 'db_file': preads_db_fn,
  407. 'preads4falcon_fasta': preads4falcon_fn,
  408. 'las_fofn': las_fofn_fn,
  409. 'config': general_config_fn,
  410. },
  411. outputs={'falcon_asm_done': falcon_asm_done_fn},
  412. parameters=parameters,
  413. dist=Dist(NPROC=4, job_dict=config['job.step.asm']),
  414. ))
  415. wf.refreshTargets()
  416. with io.cd('0-rawreads'):
  417. # for backwards-compatibility
  418. io.symlink('las-merge-combine', 'las-gather')
  419. #return falcon_asm_done
  420. def add_daligner_and_merge_tasks(
  421. wf,
  422. general_config, daligner_job_config, merge_job_config,
  423. super_dir,
  424. general_config_fn, db_fn,
  425. length_cutoff_fn, # not always needed (refactor later)
  426. p_id2las_fn, las_fofn_fn,
  427. daligner_wildcard, #='dal0_id',
  428. lamerge_wildcard, #='mer0_id',
  429. daligner_params=dict(),
  430. db_prefix='raw_reads',
  431. daligner_split_script=pype_tasks.TASK_DB_DALIGNER_SPLIT_SCRIPT,
  432. ):
  433. """
  434. Results:
  435. p_id2las_fn, las_fofn_fn
  436. """
  437. parameters = dict()
  438. # run daligner
  439. daligner_all_units_fn = os.path.join(
  440. super_dir, 'daligner-split', 'all-units-of-work.json')
  441. daligner_bash_template_fn = os.path.join(
  442. super_dir, 'daligner-split', 'daligner_bash_template.sh')
  443. params = dict(daligner_params)
  444. params['skip_checks'] = int(general_config.get('skip_checks', 0))
  445. params['wildcards'] = daligner_wildcard
  446. wf.addTask(gen_task(
  447. script=daligner_split_script,
  448. inputs={
  449. 'config': general_config_fn,
  450. 'db': db_fn,
  451. 'length_cutoff': length_cutoff_fn,
  452. },
  453. outputs={
  454. 'split': daligner_all_units_fn,
  455. 'bash_template': daligner_bash_template_fn
  456. },
  457. parameters=params,
  458. dist=Dist(local=True, NPROC=4), # really, NPROC=1, but we need to know the max
  459. ))
  460. gathered_fn = os.path.join(super_dir, 'daligner-gathered', 'gathered-done-files.json')
  461. gen_parallel_tasks(
  462. wf,
  463. daligner_all_units_fn, gathered_fn,
  464. run_dict=dict(
  465. bash_template_fn=daligner_bash_template_fn,
  466. script=pype_tasks.TASK_DB_DALIGNER_APPLY_SCRIPT, # for snakemake stuff
  467. inputs={
  468. 'units_of_work': os.path.join(super_dir, 'daligner-chunks/{%s}/some-units-of-work.json'%daligner_wildcard),
  469. },
  470. outputs={
  471. 'results': os.path.join(super_dir, 'daligner-runs/{%s}/some-done-files.json'%daligner_wildcard),
  472. },
  473. parameters={},
  474. ),
  475. dist=Dist(NPROC=4, MB=4000, job_dict=daligner_job_config),
  476. )
  477. gathered_las_fn = os.path.join(super_dir, 'daligner-combine', 'gathered-las.json')
  478. wf.addTask(gen_task(
  479. script=pype_tasks.TASK_DB_DALIGNER_COMBINE_SCRIPT,
  480. inputs={
  481. 'config': general_config_fn,
  482. 'db': db_fn,
  483. 'gathered': gathered_fn,
  484. },
  485. outputs={
  486. 'las_paths': gathered_las_fn,
  487. },
  488. parameters={},
  489. #dist=Dist(NPROC=1, MB=4000, job_dict=daligner_job_config)
  490. dist=Dist(local=True),
  491. ))
  492. # Merge .las files.
  493. las_merge_all_units_fn = os.path.join(super_dir, 'las-merge-split', 'all-units-of-work.json')
  494. bash_template_fn = os.path.join(super_dir, 'las-merge-split', 'las-merge-bash-template.sh')
  495. params = dict(parameters)
  496. params['db_prefix'] = db_prefix
  497. params['wildcards'] = lamerge_wildcard
  498. wf.addTask(gen_task(
  499. script=pype_tasks.TASK_DB_LAMERGE_SPLIT_SCRIPT,
  500. inputs={
  501. 'config': general_config_fn,
  502. 'las_paths': gathered_las_fn,
  503. },
  504. outputs={
  505. 'split': las_merge_all_units_fn,
  506. 'bash_template': bash_template_fn,
  507. },
  508. parameters=params,
  509. dist=Dist(local=True),
  510. ))
  511. gathered_fn = os.path.join(super_dir, 'las-merge-gathered', 'gathered.json')
  512. gen_parallel_tasks(
  513. wf,
  514. las_merge_all_units_fn, gathered_fn,
  515. run_dict=dict(
  516. bash_template_fn=bash_template_fn,
  517. script=pype_tasks.TASK_DB_LAMERGE_APPLY_SCRIPT, # for snakemake
  518. inputs={
  519. 'units_of_work': os.path.join(super_dir, 'las-merge-chunks/{%s}/some-units-of-work.json'%lamerge_wildcard),
  520. },
  521. outputs={
  522. 'results': os.path.join(super_dir, 'las-merge-runs/{%s}/some-las-paths.json'%lamerge_wildcard),
  523. },
  524. parameters={},
  525. ),
  526. dist=Dist(NPROC=1, job_dict=merge_job_config),
  527. )
  528. wf.addTask(gen_task(
  529. script=pype_tasks.TASK_DB_LAMERGE_COMBINE_SCRIPT,
  530. inputs={
  531. 'config': general_config_fn,
  532. 'gathered': gathered_fn,
  533. },
  534. outputs={
  535. 'block2las': p_id2las_fn,
  536. 'las_paths': las_fofn_fn,
  537. },
  538. parameters={},
  539. dist=Dist(local=True),
  540. ))
  541. def add_rep_tasks(
  542. wf,
  543. rawread_dir, config, general_config,
  544. general_config_fn, i_db_fn, length_cutoff_fn,
  545. letter, group_size, coverage_limit,
  546. ):
  547. """
  548. Add daligner/lamerge/REPmask parallel tasks for one iteration of repeat-masking.
  549. TODO: Make the tasks no-ops if the codes are zero (or something like that).
  550. """
  551. name = 'rep{}'.format(letter)
  552. rep_dir = os.path.join(rawread_dir, name)
  553. o_db_rep_fn = os.path.join(rep_dir, 'rep-combine', 'raw_reads.db')
  554. p_id2las_fn = os.path.join(rep_dir, 'las-merge-combine', 'p_id2las.json')
  555. las_fofn_fn = os.path.join(rep_dir, 'las-merge-combine', 'las_fofn.json')
  556. rep_daligner_params = dict(
  557. group_size=group_size, coverage_limit=coverage_limit,
  558. )
  559. add_daligner_and_merge_tasks(
  560. wf,
  561. general_config, config['job.step.da'], config['job.step.la'],
  562. rep_dir,
  563. general_config_fn, i_db_fn,
  564. length_cutoff_fn,
  565. p_id2las_fn, las_fofn_fn,
  566. daligner_wildcard='dal0{}_id'.format(letter),
  567. lamerge_wildcard='mer0{}_id'.format(letter),
  568. daligner_params=rep_daligner_params,
  569. db_prefix='raw_reads', # TODO: Infer
  570. daligner_split_script=pype_tasks.TASK_DB_REP_DALIGNER_SPLIT_SCRIPT,
  571. )
  572. ### REPmask
  573. # rep-split
  574. # We assume that daligner/LAmerge have already run.
  575. # Instead of using the REP.mask calls from rep-jobs.05.MASK,
  576. # we construct our own.
  577. rep_uows_fn = os.path.join(
  578. rep_dir, 'rep-split', 'rep-uows.json')
  579. rep_bash_template_fn = os.path.join(
  580. rep_dir, 'rep-split', 'bash_template.sh')
  581. wf.addTask(gen_task(
  582. script=pype_tasks.TASK_DB_REP_SPLIT_SCRIPT,
  583. inputs={
  584. 'config': general_config_fn,
  585. 'db': i_db_fn,
  586. 'las_paths': las_fofn_fn,
  587. },
  588. outputs={
  589. 'split': rep_uows_fn,
  590. 'bash_template': rep_bash_template_fn,
  591. },
  592. parameters={
  593. 'group_size': group_size,
  594. 'coverage_limit': coverage_limit,
  595. 'wildcards': '{}_id'.format(name),
  596. },
  597. dist=Dist(NPROC=1),
  598. ))
  599. # rep-apply
  600. gathered_fn = os.path.join(rep_dir, 'rep-gathered', 'gathered-done-files.json')
  601. gen_parallel_tasks(
  602. wf,
  603. rep_uows_fn, gathered_fn,
  604. run_dict=dict(
  605. bash_template_fn=rep_bash_template_fn,
  606. script='fubar-TODO', #pype_tasks.TASK_DB_REP_APPLY_SCRIPT, # for snakemake stuff
  607. inputs={
  608. 'units_of_work': '0-rawreads/%(name)s/rep-chunks/{%(name)s_id}/some-units-of-work.json'%locals(),
  609. },
  610. outputs={
  611. 'results': '0-rawreads/%(name)s/rep-runs/{%(name)s_id}/some-done-files.json'%locals(),
  612. },
  613. parameters={},
  614. ),
  615. dist=Dist(NPROC=4, MB=4000, job_dict=config['job.step.da']),
  616. )
  617. # rep-combine
  618. wf.addTask(gen_task(
  619. script=pype_tasks.TASK_DB_REP_COMBINE_SCRIPT,
  620. inputs={
  621. 'config': general_config_fn,
  622. 'db': i_db_fn,
  623. 'gathered': gathered_fn,
  624. },
  625. outputs={
  626. 'new_db': o_db_rep_fn,
  627. },
  628. parameters={
  629. 'group_size': group_size,
  630. },
  631. dist=Dist(local=True),
  632. ))
  633. return o_db_rep_fn
  634. def main(argv=sys.argv):
  635. parser = argparse.ArgumentParser()
  636. parser.add_argument('config',
  637. help='.cfg/.ini/.json')
  638. parser.add_argument('logger',
  639. nargs='?',
  640. help='(Optional)JSON config for standard Python logging module')
  641. args = parser.parse_args(argv[1:])
  642. main1(argv[0], args.config, args.logger)
  643. if __name__ == '__main__':
  644. main()