pr_ctg_track.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. from falcon_kit.multiproc import Pool
  2. import falcon_kit.util.io as io
  3. import argparse
  4. import sys
  5. import glob
  6. import os
  7. from heapq import heappush, heappop, heappushpop
  8. Reader = io.CapturedProcessReaderContext
  9. def get_pid_to_ctg(fn):
  10. pid_to_ctg = {}
  11. with open(fn) as f:
  12. for row in f:
  13. row = row.strip().split()
  14. pid, rid, oid, ctg = row
  15. pid_to_ctg.setdefault(pid, set())
  16. pid_to_ctg[pid].add(ctg)
  17. return pid_to_ctg
  18. def run_tr_stage1(db_fn, fn, min_len, bestn, pid_to_ctg):
  19. cmd = "LA4Falcon -mo %s %s" % (db_fn, fn)
  20. reader = Reader(cmd)
  21. with reader:
  22. return fn, tr_stage1(reader.readlines, min_len, bestn, pid_to_ctg)
  23. def tr_stage1(readlines, min_len, bestn, pid_to_ctg):
  24. """
  25. for each read in the b-read column inside the LAS files, we
  26. keep top `bestn` hits with a priority queue through all overlaps
  27. """
  28. rtn = {}
  29. for l in readlines():
  30. l = l.strip().split()
  31. q_id, t_id = l[:2]
  32. overlap_len = -int(l[2])
  33. idt = float(l[3])
  34. q_s, q_e, q_l = int(l[5]), int(l[6]), int(l[7])
  35. t_s, t_e, t_l = int(l[9]), int(l[10]), int(l[11])
  36. if t_l < min_len:
  37. continue
  38. if q_id not in pid_to_ctg:
  39. continue
  40. rtn.setdefault(t_id, [])
  41. if len(rtn[t_id]) < bestn:
  42. heappush(rtn[t_id], (overlap_len, q_id))
  43. else:
  44. heappushpop(rtn[t_id], (overlap_len, q_id))
  45. return rtn
  46. def run_track_reads(exe_pool, base_dir, file_list, min_len, bestn, db_fn):
  47. io.LOG('preparing tr_stage1')
  48. io.logstats()
  49. asm_dir = os.path.abspath(os.path.join(base_dir, '2-asm-falcon'))
  50. pid_to_ctg = get_pid_to_ctg(os.path.join(
  51. asm_dir, 'read_maps', 'get_ctg_read_map', 'read_to_contig_map'))
  52. io.LOG('len(pid_to_ctg) == {}'.format(len(pid_to_ctg)))
  53. assert pid_to_ctg, 'Empty pid_to_ctg. Maybe empty {!r}?'.format(file_list)
  54. inputs = []
  55. for fn in file_list:
  56. inputs.append((run_tr_stage1, db_fn, fn, min_len, bestn, pid_to_ctg))
  57. """
  58. Aggregate hits from each individual LAS and keep the best n hit.
  59. Note that this does not guarantee that the final results is globally the best n hits espcially
  60. when the number of `bestn` is too small. In those case, if there is more hits from single LAS
  61. file, then we will miss some good hits.
  62. """
  63. bread_to_areads = {}
  64. for fn, res in exe_pool.imap(io.run_func, inputs):
  65. for k in res:
  66. bread_to_areads.setdefault(k, [])
  67. for item in res[k]:
  68. if len(bread_to_areads[k]) < bestn:
  69. heappush(bread_to_areads[k], item)
  70. else:
  71. heappushpop(bread_to_areads[k], item)
  72. assert bread_to_areads, 'No bread_to_areads found. Is there any point in continuing?'
  73. with open(os.path.join(asm_dir, "read_maps/pread_to_contigs"), "w") as out_f:
  74. for bread in bread_to_areads:
  75. ctg_score = {}
  76. for s, pid in bread_to_areads[bread]:
  77. if pid not in pid_to_ctg:
  78. continue
  79. ctgs = pid_to_ctg[pid]
  80. for ctg in ctgs:
  81. ctg_score.setdefault(ctg, [0, 0])
  82. ctg_score[ctg][0] += -s
  83. ctg_score[ctg][1] += 1
  84. ctg_score = list(ctg_score.items())
  85. ctg_score.sort(key=lambda k: k[1][0])
  86. rank = 0
  87. for ctg, score_count in ctg_score:
  88. if bread in pid_to_ctg and ctg in pid_to_ctg[bread]:
  89. in_ctg = 1
  90. else:
  91. in_ctg = 0
  92. score, count = score_count
  93. print(bread, ctg, count, rank, score, in_ctg, file=out_f)
  94. rank += 1
  95. def try_run_track_reads(n_core, base_dir, min_len, bestn):
  96. io.LOG('starting track_reads')
  97. pread_dir = os.path.abspath(os.path.join(base_dir, "1-preads_ovl"))
  98. file_list = glob.glob(os.path.join(pread_dir, "m*/preads.*.las"))
  99. io.LOG('file list: %r' % file_list)
  100. db_fn = os.path.join(pread_dir, "preads.db")
  101. n_core = min(n_core, len(file_list))
  102. exe_pool = Pool(n_core)
  103. try:
  104. run_track_reads(exe_pool, base_dir, file_list, min_len, bestn, db_fn)
  105. io.LOG('finished track_reads')
  106. except:
  107. io.LOG('terminating track_reads workers...')
  108. exe_pool.terminate()
  109. raise
  110. def track_reads(n_core, base_dir, min_len, bestn, debug, silent, stream):
  111. if debug:
  112. n_core = 0
  113. silent = False
  114. if silent:
  115. io.LOG = io.write_nothing
  116. if stream:
  117. global Reader
  118. Reader = io.StreamedProcessReaderContext
  119. try_run_track_reads(n_core, base_dir, min_len, bestn)
  120. def parse_args(argv):
  121. parser = argparse.ArgumentParser(description='scan the pread overlap information to identify the best hit from the preads \
  122. to the contigs with read_to_contig_map generated by `fc_get_read_ctg_map` in `2-asm-falcon/read_maps/get_ctg_read_map/read_to_contig_map`',
  123. formatter_class=argparse.ArgumentDefaultsHelpFormatter)
  124. parser.add_argument('--n_core', type=int, default=4,
  125. help='number of processes used for for tracking reads; '
  126. '0 for main process only')
  127. #parser.add_argument('--fofn', type=str, help='file contains the path of all LAS file to be processed in parallel')
  128. #parser.add_argument('--db', type=str, dest='db_fn', help='read db file path')
  129. parser.add_argument('--base_dir', type=str, default="./",
  130. help='the base working dir of a FALCON assembly')
  131. parser.add_argument('--min_len', type=int, default=2500,
  132. help="min length of the reads")
  133. parser.add_argument('--stream', action='store_true',
  134. help='stream from LA4Falcon, instead of slurping all at once; can save memory for large data')
  135. parser.add_argument('--debug', '-g', action='store_true',
  136. help="single-threaded, plus other aids to debugging")
  137. parser.add_argument('--silent', action='store_true',
  138. help="suppress cmd reporting on stderr")
  139. parser.add_argument('--bestn', type=int, default=40,
  140. help="keep best n hits")
  141. args = parser.parse_args(argv[1:])
  142. return args
  143. def main(argv=sys.argv):
  144. args = parse_args(argv)
  145. track_reads(**vars(args))
  146. if __name__ == "__main__":
  147. main()