tiling_path.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. class TilingPathEdge(object):
  2. def __init__(self, split_line = None):
  3. self.ctg_id, self.v, self.w, self.wrid, self.b, self.e, self.score, self.identity = \
  4. None, None, None, None, None, None, None, None
  5. self.parsed = False
  6. if split_line: # pragma: no cover
  7. self.set_from(split_line)
  8. def set_from(self, split_line):
  9. assert(len(split_line) >= 8)
  10. self.parsed = False
  11. self.ctg_id = split_line[0]
  12. self.v = split_line[1]
  13. self.w = split_line[2]
  14. self.wrid = split_line[3]
  15. self.b = int(split_line[4])
  16. self.e = int(split_line[5])
  17. self.score = int(split_line[6])
  18. self.identity = float(split_line[7])
  19. self.parsed = True
  20. def get_split_line(self):
  21. return [str(val) for val in [self.ctg_id, self.v, self.w, self.wrid, self.b, self.e, self.score, self.identity]]
  22. # def __str__(self):
  23. # return ' '.join(self.get_split_line())
  24. class TilingPath(object):
  25. def __init__(self, tiling_edge_list, contig_sequence_len = None):
  26. self.edges = tiling_edge_list # These are TilingPathEdge objects.
  27. self.v_to_edge = {}
  28. self.w_to_edge = {}
  29. self.coords = {}
  30. self.contig_len = 0
  31. self.first_node_offset = 0
  32. for i in range(1, len(tiling_edge_list)):
  33. assert(tiling_edge_list[i-1].w == tiling_edge_list[i].v)
  34. # If the total contig sequence len is known, use that to
  35. # calculate the length of the first read (in case proper
  36. # contigs are specified). This is needed to offset the coordinates
  37. # which can be calculated from the tiling path.
  38. if contig_sequence_len != None:
  39. _, tiling_len = calc_node_coords(tiling_edge_list)
  40. msg = 'contig_sequence_len < tiling_len ({} < {})'.format(contig_sequence_len, tiling_len)
  41. assert contig_sequence_len >= tiling_len, msg
  42. self.first_node_offset = contig_sequence_len - tiling_len # This is the length of the first node.
  43. # The self.coords is a dict: self.coords[v] = coordinate_on_contig
  44. self.coords, self.contig_len = calc_node_coords(tiling_edge_list, self.first_node_offset)
  45. # Sanity check.
  46. assert(contig_sequence_len == None or self.contig_len == contig_sequence_len)
  47. # Create a lookup of node to edge.
  48. self.v_to_edge = {}
  49. self.w_to_edge = {}
  50. for i in range(len(self.edges)):
  51. e = self.edges[i]
  52. self.v_to_edge[e.v] = i
  53. self.w_to_edge[e.w] = i
  54. def dump_as_split_lines(self):
  55. return [e.get_split_line() for e in self.edges]
  56. def get_subpath(self, start_coord, end_coord):
  57. """
  58. Given a TilingPath object, the method called `TilingPath.get_subpath() will
  59. attempt to extract a part of the tiling path which covers the specified
  60. coordinates.
  61. For example, user could specify alignment start and end positions, and
  62. provide the coordinates to this method, and the result would be a list
  63. of tiling path edges which correspond to the tiling in between the two
  64. coordinates (most likely slightly longer on both ends).
  65. Both start and end coordinates can be < 0 if the input contig was improper.
  66. Returns a list of split_line tiling path edges (not TilingEdge objects).
  67. """
  68. assert(self.edges)
  69. assert(start_coord <= end_coord)
  70. # end_coord -= 1 # Make the end inclusive.
  71. # start_node = None
  72. # end_node = None
  73. start_edge = None
  74. end_edge = None
  75. if start_coord < self.coords[self.edges[0].v]:
  76. start_edge = 0
  77. if end_coord <= self.coords[self.edges[0].v]:
  78. end_edge = 1
  79. for i in range(len(self.edges)):
  80. e = self.edges[i]
  81. if start_coord >= self.coords[e.v] and start_coord < self.coords[e.w]:
  82. start_edge = i
  83. if end_coord > self.coords[e.v] and end_coord <= self.coords[e.w]:
  84. end_edge = i + 1
  85. if end_coord >= self.coords[self.edges[-1].w]:
  86. end_edge = len(self.edges)
  87. assert(start_edge != None and end_edge != None)
  88. # Since the start_coord and end_coord can end within an edge,
  89. # we return the position in the final contigas.
  90. new_start_coord = start_coord - self.coords[self.edges[start_edge].v]
  91. new_end_coord = end_coord - self.coords[self.edges[start_edge].v]
  92. new_path = self.edges[start_edge:end_edge]
  93. new_path = [val.get_split_line() for val in new_path]
  94. return new_path, new_start_coord, new_end_coord
  95. def calc_node_coords(tiling_edge_list, first_node_offset=0):
  96. """
  97. For a single tiling path (tiling_edge_list is a list
  98. of edges for a particular contig) calculates the
  99. genomic coordinate of every node in the path.
  100. In case there are cycles in the tiling path,
  101. the existing node's coordinate will be overwritten.
  102. `first_node_offset` refers to the length of the first node. If
  103. not specified, the contig length should not
  104. consider the length of the first node.
  105. """
  106. if not tiling_edge_list:
  107. return {}, 0
  108. coord_map = {}
  109. contig_len = 0
  110. edge0 = tiling_edge_list[0]
  111. coord_map[edge0.v] = first_node_offset
  112. for edge in tiling_edge_list:
  113. if edge.v not in coord_map:
  114. raise Exception(
  115. 'Tiling path is not in sorted order. Node "{v!r}" does not yet have an assigned coordinate.'.format(v=edge.v))
  116. coord = coord_map[edge.v]
  117. coord += abs(int(edge.b) - int(edge.e))
  118. coord_map[edge.w] = coord
  119. contig_len = max(contig_len, coord)
  120. return coord_map, contig_len
  121. def yield_split_line(fp_in):
  122. for line in fp_in: # Example row: "0 000000007:B 000000005:B 000000005 9 0 1980 99.95"
  123. line = line.strip()
  124. if len(line) == 0: continue
  125. sl = line.split()
  126. yield sl
  127. def load_tiling_paths(tp_file, contig_lens=None, whitelist_seqs=None):
  128. with open(tp_file) as fp_in:
  129. ret = load_tiling_paths_from_stream(fp_in, contig_lens=contig_lens, whitelist_seqs=whitelist_seqs)
  130. return ret
  131. def load_tiling_paths_from_stream(fp_in, contig_lens=None, whitelist_seqs=None):
  132. split_lines = list(yield_split_line(fp_in))
  133. return load_tiling_paths_from_split_lines(split_lines, contig_lens=contig_lens, whitelist_seqs=whitelist_seqs)
  134. def load_tiling_paths_from_split_lines(split_lines, contig_lens=None, whitelist_seqs=None):
  135. """
  136. Parameters:
  137. contig_lens - if a dict with contig sequence lengths is specified, the difference between the
  138. contig len and the length obtained from the tiling path will be used to offset
  139. the tiling path coordinates.
  140. whitelist_seqs - a dict or a set object containing contig IDs to load. If None, no filter will be applied.
  141. """
  142. tiling_path_edges = {}
  143. for sl in split_lines: # Example row: "0 000000007:B 000000005:B 000000005 9 0 1980 99.95"
  144. new_edge = TilingPathEdge(sl)
  145. ctg_id = new_edge.ctg_id
  146. if whitelist_seqs != None and (ctg_id in whitelist_seqs) == False:
  147. continue
  148. tiling_path_edges.setdefault(ctg_id, [])
  149. tiling_path_edges[ctg_id].append(new_edge)
  150. # Convert the flat lists to TilingPath objects.
  151. # These keep track of
  152. tiling_paths = {}
  153. for ctg_id, edges in tiling_path_edges.items():
  154. ctg_len = None
  155. if contig_lens != None and ctg_id in contig_lens:
  156. ctg_len = contig_lens[ctg_id]
  157. tiling_paths[ctg_id] = TilingPath(edges, ctg_len)
  158. return tiling_paths
  159. def find_a_ctg_placement(p_paths, a_paths):
  160. """
  161. Determines placement coordinates for each a_ctg in a given a_paths dict of
  162. TilingPaths, and returns a dict of:
  163. placement[p_ctg_id][a_ctg_id] = (start, end, p_ctg_id, a_ctg_id, first_node, last_node)
  164. """
  165. placement = {}
  166. for a_ctg_id, a_tp in a_paths.items():
  167. if len(a_tp.edges) == 0: continue # pragma: no cover
  168. first_node = a_tp.edges[0].v
  169. last_node = a_tp.edges[-1].w
  170. p_ctg_id = a_ctg_id.split('-')[0].split('_')[0]
  171. p_tp = p_paths[p_ctg_id]
  172. start, end = p_tp.coords[first_node], p_tp.coords[last_node]
  173. placement.setdefault(p_ctg_id, {})
  174. placement[p_ctg_id][a_ctg_id] = (start, end, p_ctg_id, a_ctg_id, first_node, last_node)
  175. return placement