rr_ctg_track.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. from falcon_kit.multiproc import Pool
  2. import falcon_kit.util.io as io
  3. import argparse
  4. import sys
  5. import glob
  6. import os
  7. from heapq import heappush, heappop, heappushpop
  8. Reader = io.CapturedProcessReaderContext
  9. def get_rid_to_ctg(fn):
  10. rid_to_ctg = {}
  11. with open(fn) as f:
  12. for row in f:
  13. row = row.strip().split()
  14. pid, rid, oid, ctg = row
  15. rid_to_ctg.setdefault(rid, set())
  16. rid_to_ctg[rid].add(ctg)
  17. return rid_to_ctg
  18. def run_tr_stage1(db_fn, fn, min_len, bestn, rid_to_ctg):
  19. cmd = "LA4Falcon -m %s %s" % (db_fn, fn)
  20. reader = Reader(cmd)
  21. with reader:
  22. return fn, tr_stage1(reader.readlines, min_len, bestn, rid_to_ctg)
  23. def tr_stage1(readlines, min_len, bestn, rid_to_ctg):
  24. """
  25. for each read in the b-read column inside the LAS files, we
  26. keep top `bestn` hits with a priority queue through all overlaps
  27. """
  28. rtn = {}
  29. for l in readlines():
  30. l = l.strip().split()
  31. q_id, t_id = l[:2]
  32. overlap_len = -int(l[2])
  33. idt = float(l[3])
  34. q_s, q_e, q_l = int(l[5]), int(l[6]), int(l[7])
  35. t_s, t_e, t_l = int(l[9]), int(l[10]), int(l[11])
  36. if t_l < min_len:
  37. continue
  38. if q_id not in rid_to_ctg:
  39. continue
  40. rtn.setdefault(t_id, [])
  41. if len(rtn[t_id]) < bestn:
  42. heappush(rtn[t_id], (overlap_len, q_id))
  43. else:
  44. heappushpop(rtn[t_id], (overlap_len, q_id))
  45. return rtn
  46. def run_track_reads(exe_pool, base_dir, file_list, min_len, bestn, db_fn):
  47. io.LOG('preparing tr_stage1')
  48. io.logstats()
  49. asm_dir = os.path.abspath(os.path.join(base_dir, '2-asm-falcon'))
  50. rid_to_ctg = get_rid_to_ctg(os.path.join(
  51. asm_dir, 'read_maps', 'get_ctg_read_map', 'read_to_contig_map'))
  52. inputs = []
  53. for fn in file_list:
  54. inputs.append((run_tr_stage1, db_fn, fn, min_len, bestn, rid_to_ctg))
  55. """
  56. Aggregate hits from each individual LAS and keep the best n hit.
  57. Note that this does not guarantee that the final results is globally the best n hits espcially
  58. when the number of `bestn` is too small. In those case, if there is more hits from single LAS
  59. file, then we will miss some good hits.
  60. """
  61. bread_to_areads = {}
  62. for fn, res in exe_pool.imap(io.run_func, inputs):
  63. for k in res:
  64. bread_to_areads.setdefault(k, [])
  65. for item in res[k]:
  66. if len(bread_to_areads[k]) < bestn:
  67. heappush(bread_to_areads[k], item)
  68. else:
  69. heappushpop(bread_to_areads[k], item)
  70. #rid_to_oid = open(os.path.join(rawread_dir, 'dump_rawread_ids', 'raw_read_ids')).read().split('\n')
  71. """
  72. For each b-read, we find the best contig map throgh the b->a->contig map.
  73. """
  74. with open(os.path.join(asm_dir, 'read_maps/rawread_to_contigs'), 'w') as out_f:
  75. for bread in bread_to_areads:
  76. ctg_score = {}
  77. for s, rid in bread_to_areads[bread]:
  78. if rid not in rid_to_ctg:
  79. continue
  80. ctgs = rid_to_ctg[rid]
  81. for ctg in ctgs:
  82. ctg_score.setdefault(ctg, [0, 0])
  83. ctg_score[ctg][0] += -s
  84. ctg_score[ctg][1] += 1
  85. #oid = rid_to_oid[int(bread)]
  86. ctg_score = list(ctg_score.items())
  87. ctg_score.sort(key=lambda k: k[1][0])
  88. rank = 0
  89. for ctg, score_count in ctg_score:
  90. if bread in rid_to_ctg and ctg in rid_to_ctg[bread]:
  91. in_ctg = 1
  92. else:
  93. in_ctg = 0
  94. score, count = score_count
  95. #print(bread, oid, ctg, count, rank, score, in_ctg, file=out_f)
  96. print(bread, ctg, count, rank, score, in_ctg, file=out_f)
  97. rank += 1
  98. def try_run_track_reads(n_core, base_dir, min_len, bestn):
  99. io.LOG('starting track_reads')
  100. rawread_dir = os.path.abspath(os.path.join(base_dir, "0-rawreads"))
  101. # better logic for finding the las files path or move the logic to extern (taking the --fofn option?)
  102. file_list = glob.glob(os.path.join(rawread_dir, "m*/raw_reads.*.las"))
  103. io.LOG('file list: %r' % file_list)
  104. # same, shoud we decide this as a parameter
  105. db_fn = os.path.join(rawread_dir, "raw_reads.db")
  106. n_core = min(n_core, len(file_list))
  107. exe_pool = Pool(n_core)
  108. try:
  109. run_track_reads(exe_pool, base_dir, file_list, min_len, bestn, db_fn)
  110. io.LOG('finished track_reads')
  111. except:
  112. io.LOG('terminating track_reads workers...')
  113. exe_pool.terminate()
  114. raise
  115. def track_reads(n_core, base_dir, min_len, bestn, debug, silent, stream):
  116. if debug:
  117. n_core = 0
  118. silent = False
  119. if silent:
  120. io.LOG = io.write_nothing
  121. if stream:
  122. global Reader
  123. Reader = io.StreamedProcessReaderContext
  124. try_run_track_reads(n_core, base_dir, min_len, bestn)
  125. def parse_args(argv):
  126. parser = argparse.ArgumentParser(description='scan the raw read overlap information to identify the best hit from the reads \
  127. to the contigs with read_to_contig_map generated by `fc_get_read_ctg_map` in `2-asm-falcon/read_maps/get_ctg_read_map/read_to_contig_map`',
  128. formatter_class=argparse.ArgumentDefaultsHelpFormatter)
  129. parser.add_argument('--n_core', type=int, default=4,
  130. help='number of processes used for for tracking reads; '
  131. '0 for main process only')
  132. #parser.add_argument('--fofn', type=str, help='file contains the path of all LAS file to be processed in parallel')
  133. #parser.add_argument('--db', type=str, dest='db_fn', help='read db file path')
  134. parser.add_argument('--base_dir', type=str, default="./",
  135. help='the base working dir of a FALCON assembly')
  136. parser.add_argument('--min_len', type=int, default=2500,
  137. help="min length of the reads")
  138. parser.add_argument('--stream', action='store_true',
  139. help='stream from LA4Falcon, instead of slurping all at once; can save memory for large data')
  140. parser.add_argument('--debug', '-g', action='store_true',
  141. help="single-threaded, plus other aids to debugging")
  142. parser.add_argument('--silent', action='store_true',
  143. help="suppress cmd reporting on stderr")
  144. parser.add_argument('--bestn', type=int, default=40,
  145. help="keep best n hits")
  146. args = parser.parse_args(argv[1:])
  147. return args
  148. def main(argv=sys.argv):
  149. args = parse_args(argv)
  150. track_reads(**vars(args))
  151. if __name__ == "__main__":
  152. main()