123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138 |
- from falcon_kit.FastaReader import open_fasta_reader
- import argparse
- import sys
- # import falcon_kit.align_dw as align
- import falcon_kit.align_edlib as align
- class TooLongError(Exception): pass
- def log(msg):
- sys.stderr.write(msg)
- sys.stderr.write('\n')
- def yield_single_compound(fp_in):
- """
- Loads all a_ctg with the same ctg_id and a_id.
- The header of a_ctg is:
- p_ctg_id-a_id-sub_id v w total_length total_score num_path_edges delta_len idt cov
- The first sequence in the returned list is the base a_ctg (the exact sequence
- which is part of the primary contig (the base of the bubble)).
- """
- ret = []
- prev_id = None
- for r in fp_in:
- tig_id, v, w, len_, ovl, ne, delta_l, idt, cov = r.name.split()
- p_ctg_id, a_id, sub_id = tig_id.split('-')
- curr_id = (p_ctg_id, a_id)
- if prev_id != None and prev_id != curr_id:
- yield ret
- ret = []
- prev_id = curr_id
- ret.append(r)
- yield ret
- def filter_duplicate(compound_a_ctg, max_idt, max_aln_cov, min_len_diff, min_seq_len, ploidy):
- """
- Takes a list of a_ctg sequences in a compound unitig (bubble) which need
- to be deduplicated according to the parameters.
- The zeroth sequence in the list is the "base" sequence. This sequence is
- already part of the primary path by definition, and will not be output.
- """
- ret = []
- # Sanity check.
- if len(compound_a_ctg) == 0: return ret # pragma: no cover
- ref_seqs = [compound_a_ctg[0]]
- # Zeroth sequence is the base seq.
- for i in range(1, len(compound_a_ctg)):
- header = compound_a_ctg[i].name.split()
- a_ctg_id, v, w, len_, ovl, ne, delta_l, idt, cov = header
- a_ctg_seq = compound_a_ctg[i].sequence
- # Reset the values
- delta_l, idt, cov = 0.0, 1.0, 1.0
- is_duplicate = False
- # Align against the base sequence and all non-filtered alternate branches.
- loop_to = len(ref_seqs) if ploidy <= 0 else min(ploidy, len(ref_seqs))
- for j in range(0, loop_to):
- # Just fetch the components for readibility.
- ref_ctg_id = ref_seqs[j].name.split()[0]
- ref_seq = ref_seqs[j].sequence
- log('[i = %d, j = %d] Comparing: query "%s" vs ref "%s".' % (i, j, a_ctg_id, ref_ctg_id))
- # Align.
- delta_l, idt, cov = align.get_aln_results(ref_seq, a_ctg_seq, min_seq_len)
- # Round to the floor of 2 decimal places. Needed to reproduce
- # old behaviour.
- idt = float('%.2f' % (idt))
- cov = float('%.2f' % (cov))
- log(' Rounded: new_delta_l = %d, new_idt = %.2f, new_cov = %.2f' % (delta_l, idt, cov))
- # Check if this is a duplicate.
- # The same conditions apply as in the old version.
- if 100 * idt > max_idt and \
- 100 * cov > max_aln_cov and \
- abs(delta_l) < min_len_diff:
- is_duplicate = True
- log(' -> Duplicate!')
- break
- if is_duplicate == False:
- # This branch is not a duplicate. Add it to references,
- # so that the following branches can be compared to it afterwards.
- ref_seqs.append(compound_a_ctg[i])
- # Append the non-duplicates.
- new_header = ' '.join([a_ctg_id, v, w, len_, ovl, ne, str(delta_l), '%.2f' % (idt), '%.2f' % (cov)])
- ret.append((compound_a_ctg[i], new_header))
- log('')
- return ret
- def run(fp_out, fp_in, max_idt, max_aln_cov, min_len_diff, min_seq_len, ploidy):
- for compound_a_ctg in yield_single_compound(fp_in):
- filtered_a_ctg = filter_duplicate(compound_a_ctg, max_idt, max_aln_cov, min_len_diff, min_seq_len, ploidy)
- for a_ctg, new_header in filtered_a_ctg:
- fp_out.write('>%s\n' % (new_header))
- fp_out.write(a_ctg.sequence)
- fp_out.write('\n')
- def parse_args(argv):
- parser = argparse.ArgumentParser(description='Removes duplicate a-tig, iff *all* conditions are violated. Assumes the working directory has the a_ctg_all.fa file, and produces a_ctg.fa',
- formatter_class=argparse.ArgumentDefaultsHelpFormatter)
- parser.add_argument('--max-idt', type=int,
- help="Keep a-tig if the identity (in %%) to the primary contig is <= max_idt", default=96)
- parser.add_argument('--max-aln-cov', type=int,
- help="Keep a-tig if the alignment coverage (in %%) on the a-tig is <= max_aln_cov", default=97)
- parser.add_argument('--min-len-diff', type=int,
- help="Keep a-tig if the length different > min_len_diff", default=500)
- parser.add_argument('--min-seq-len', type=int,
- help="Branches with length less than this threshold will always be deduplicated.", default=2000)
- parser.add_argument('--ploidy', type=int,
- help="For a diplid genome, 2 branches per SV are expected. This parameter limits the number of pairwise comparison. If <= 0, this threshold is not applied.", default=2)
- parser.add_argument('--a-ctg-all', type=str,
- help="Input set of all associate contigs for deduplication.", default="a_ctg_all.fa")
- args = parser.parse_args(argv[1:])
- return args
- def main(argv=sys.argv):
- args = parse_args(argv)
- with open_fasta_reader(args.a_ctg_all) as fp_in:
- run(sys.stdout, fp_in, args.max_idt, args.max_aln_cov, args.min_len_diff, args.min_seq_len, args.ploidy)
- if __name__ == "__main__": # pragma: no cover
- main(sys.argv) # pragma: no cover
|