123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185 |
- from falcon_kit.multiproc import Pool
- import falcon_kit.util.io as io
- import argparse
- import sys
- import glob
- import os
- from heapq import heappush, heappop, heappushpop
- Reader = io.CapturedProcessReaderContext
- def get_rid_to_ctg(fn):
- rid_to_ctg = {}
- with open(fn) as f:
- for row in f:
- row = row.strip().split()
- pid, rid, oid, ctg = row
- rid_to_ctg.setdefault(rid, set())
- rid_to_ctg[rid].add(ctg)
- return rid_to_ctg
- def run_tr_stage1(db_fn, fn, min_len, bestn, rid_to_ctg):
- cmd = "LA4Falcon -m %s %s" % (db_fn, fn)
- reader = Reader(cmd)
- with reader:
- return fn, tr_stage1(reader.readlines, min_len, bestn, rid_to_ctg)
- def tr_stage1(readlines, min_len, bestn, rid_to_ctg):
- """
- for each read in the b-read column inside the LAS files, we
- keep top `bestn` hits with a priority queue through all overlaps
- """
- rtn = {}
- for l in readlines():
- l = l.strip().split()
- q_id, t_id = l[:2]
- overlap_len = -int(l[2])
- idt = float(l[3])
- q_s, q_e, q_l = int(l[5]), int(l[6]), int(l[7])
- t_s, t_e, t_l = int(l[9]), int(l[10]), int(l[11])
- if t_l < min_len:
- continue
- if q_id not in rid_to_ctg:
- continue
- rtn.setdefault(t_id, [])
- if len(rtn[t_id]) < bestn:
- heappush(rtn[t_id], (overlap_len, q_id))
- else:
- heappushpop(rtn[t_id], (overlap_len, q_id))
- return rtn
- def run_track_reads(exe_pool, base_dir, file_list, min_len, bestn, db_fn):
- io.LOG('preparing tr_stage1')
- io.logstats()
- asm_dir = os.path.abspath(os.path.join(base_dir, '2-asm-falcon'))
- rid_to_ctg = get_rid_to_ctg(os.path.join(
- asm_dir, 'read_maps', 'get_ctg_read_map', 'read_to_contig_map'))
- inputs = []
- for fn in file_list:
- inputs.append((run_tr_stage1, db_fn, fn, min_len, bestn, rid_to_ctg))
- """
- Aggregate hits from each individual LAS and keep the best n hit.
- Note that this does not guarantee that the final results is globally the best n hits espcially
- when the number of `bestn` is too small. In those case, if there is more hits from single LAS
- file, then we will miss some good hits.
- """
- bread_to_areads = {}
- for fn, res in exe_pool.imap(io.run_func, inputs):
- for k in res:
- bread_to_areads.setdefault(k, [])
- for item in res[k]:
- if len(bread_to_areads[k]) < bestn:
- heappush(bread_to_areads[k], item)
- else:
- heappushpop(bread_to_areads[k], item)
- #rid_to_oid = open(os.path.join(rawread_dir, 'dump_rawread_ids', 'raw_read_ids')).read().split('\n')
- """
- For each b-read, we find the best contig map throgh the b->a->contig map.
- """
- with open(os.path.join(asm_dir, 'read_maps/rawread_to_contigs'), 'w') as out_f:
- for bread in bread_to_areads:
- ctg_score = {}
- for s, rid in bread_to_areads[bread]:
- if rid not in rid_to_ctg:
- continue
- ctgs = rid_to_ctg[rid]
- for ctg in ctgs:
- ctg_score.setdefault(ctg, [0, 0])
- ctg_score[ctg][0] += -s
- ctg_score[ctg][1] += 1
- #oid = rid_to_oid[int(bread)]
- ctg_score = list(ctg_score.items())
- ctg_score.sort(key=lambda k: k[1][0])
- rank = 0
- for ctg, score_count in ctg_score:
- if bread in rid_to_ctg and ctg in rid_to_ctg[bread]:
- in_ctg = 1
- else:
- in_ctg = 0
- score, count = score_count
- #print(bread, oid, ctg, count, rank, score, in_ctg, file=out_f)
- print(bread, ctg, count, rank, score, in_ctg, file=out_f)
- rank += 1
- def try_run_track_reads(n_core, base_dir, min_len, bestn):
- io.LOG('starting track_reads')
- rawread_dir = os.path.abspath(os.path.join(base_dir, "0-rawreads"))
- # better logic for finding the las files path or move the logic to extern (taking the --fofn option?)
- file_list = glob.glob(os.path.join(rawread_dir, "m*/raw_reads.*.las"))
- io.LOG('file list: %r' % file_list)
- # same, shoud we decide this as a parameter
- db_fn = os.path.join(rawread_dir, "raw_reads.db")
- n_core = min(n_core, len(file_list))
- exe_pool = Pool(n_core)
- try:
- run_track_reads(exe_pool, base_dir, file_list, min_len, bestn, db_fn)
- io.LOG('finished track_reads')
- except:
- io.LOG('terminating track_reads workers...')
- exe_pool.terminate()
- raise
- def track_reads(n_core, base_dir, min_len, bestn, debug, silent, stream):
- if debug:
- n_core = 0
- silent = False
- if silent:
- io.LOG = io.write_nothing
- if stream:
- global Reader
- Reader = io.StreamedProcessReaderContext
- try_run_track_reads(n_core, base_dir, min_len, bestn)
- def parse_args(argv):
- parser = argparse.ArgumentParser(description='scan the raw read overlap information to identify the best hit from the reads \
- to the contigs with read_to_contig_map generated by `fc_get_read_ctg_map` in `2-asm-falcon/read_maps/get_ctg_read_map/read_to_contig_map`',
- formatter_class=argparse.ArgumentDefaultsHelpFormatter)
- parser.add_argument('--n_core', type=int, default=4,
- help='number of processes used for for tracking reads; '
- '0 for main process only')
- #parser.add_argument('--fofn', type=str, help='file contains the path of all LAS file to be processed in parallel')
- #parser.add_argument('--db', type=str, dest='db_fn', help='read db file path')
- parser.add_argument('--base_dir', type=str, default="./",
- help='the base working dir of a FALCON assembly')
- parser.add_argument('--min_len', type=int, default=2500,
- help="min length of the reads")
- parser.add_argument('--stream', action='store_true',
- help='stream from LA4Falcon, instead of slurping all at once; can save memory for large data')
- parser.add_argument('--debug', '-g', action='store_true',
- help="single-threaded, plus other aids to debugging")
- parser.add_argument('--silent', action='store_true',
- help="suppress cmd reporting on stderr")
- parser.add_argument('--bestn', type=int, default=40,
- help="keep best n hits")
- args = parser.parse_args(argv[1:])
- return args
- def main(argv=sys.argv):
- args = parse_args(argv)
- track_reads(**vars(args))
- if __name__ == "__main__":
- main()
|