gfa_graph.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. import os
  2. import sys
  3. import json
  4. GFA_H_TAG = 'H'
  5. GFA_S_TAG = 'S'
  6. GFA_L_TAG = 'L'
  7. GFA_P_TAG = 'P'
  8. GFA_ORIENT_FWD = '+'
  9. GFA_ORIENT_REV = '-'
  10. GFA_SEQ_UNKNOWN = '*'
  11. GFA_LINK_CIGAR_UNKNOWN = '*'
  12. GFA2_E_TAG = 'E'
  13. KW_NAME = 'name'
  14. KW_TAGS = 'tags'
  15. KW_LABELS = 'labels'
  16. KW_NODE_SEQ = 'seq'
  17. KW_NODE_LEN = 'len'
  18. KW_EDGE_SOURCE = 'v'
  19. KW_EDGE_SOURCE_ORIENT = 'v_orient'
  20. KW_EDGE_SINK = 'w'
  21. KW_EDGE_SINK_ORIENT = 'w_orient'
  22. KW_EDGE_CIGAR = 'cigar'
  23. KW_EDGE_SOURCE_START = 'v_start'
  24. KW_EDGE_SOURCE_END = 'v_end'
  25. KW_EDGE_SINK_START = 'w_start'
  26. KW_EDGE_SINK_END = 'w_end'
  27. KW_PATH_NODES = 'nodes'
  28. KW_PATH_CIGARS = 'cigars'
  29. """
  30. GFA-1:
  31. - H line: line = '\t'.join([GFA_H_TAG, '\tVN:Z:1.0'])
  32. - S line: line = '\t'.join([GFA_S_TAG, rname, GFA_SEQ_UNKNOWN if (not write_reads) else r.sequence, 'LN:i:%s' % len(r.sequence)])
  33. - L line: line = '\t'.join([GFA_L_TAG, edge.sg_edge.v_name, edge.sg_edge.v_orient, edge.sg_edge.w_name, edge.sg_edge.w_orient, cig_str])
  34. - P line: line = '\t'.join([GFA_P_TAG, ctg_name, ','.join(segs), ','.join(segs_cigar)]
  35. GFA-2:
  36. - H line: line = '\t'.join([GFA_H_TAG, '\tVN:Z:2.0'])
  37. - S line: line = '\t'.join([GFA_S_TAG, rname, str(len(r.sequence)), GFA_SEQ_UNKNOWN if (not write_reads) else r.sequence])
  38. - E line: line = '\t'.join([GFA2_E_TAG, edge_name, source_node, sink_node, source_start, source_end, sink_start, sink_end, cig_str])
  39. """
  40. class GFAGraph:
  41. def __init__(self):
  42. self.nodes = {}
  43. self.edges = {}
  44. self.paths = {}
  45. """
  46. Node: {KW_NAME: '01234', KW_NODE_SEQ: 'ACTG', 'len': 4}
  47. Node: {'name': '56789', KW_NODE_SEQ: 'CAGT', 'len': 4}
  48. Edge: {KW_NAME: 'edge1', 'source': '01234', 'sink': '56789', 'cigar': '*', 'source_start': 3, 'source_end': 4, 'sink_start': 0, 'sink_end': 1}
  49. Path: {KW_NAME: '000000F', 'nodes': ['01234', '56789'], '
  50. """
  51. def add_node(self, node_name, node_len, node_seq='*', tags={}, labels={}):
  52. if len(node_name) == 0:
  53. raise 'Node name should be a non-empty string.\n'
  54. if node_len < 0:
  55. raise 'Node length should be >= 0.\n'
  56. if len(node_seq) == 0:
  57. raise 'Node sequence should be a non-empty string. Use "*" instead.\n'
  58. if isinstance(tags, dict) == False:
  59. raise 'The tags object must be a dict.\n'
  60. if isinstance(labels, dict) == False:
  61. raise 'The labels object must be a dict.\n'
  62. self.nodes[node_name] = {
  63. KW_NAME: node_name,
  64. KW_NODE_LEN: node_len,
  65. KW_NODE_SEQ: node_seq,
  66. KW_TAGS: tags,
  67. KW_LABELS: labels
  68. }
  69. def add_edge(self, edge_name, source, source_orient, sink, sink_orient, source_start, source_end, sink_start, sink_end, cigar, tags={}, labels={}):
  70. """
  71. source_orient + if fwd, - otherwise.
  72. sink_orient + if fwd, - otherwise.
  73. """
  74. if len(edge_name) == 0:
  75. raise 'Edge name should be a non-empty string.\n'
  76. if len(source) == 0:
  77. raise 'Source node not specified.\n'
  78. if len(sink) == 0:
  79. raise 'Sink node not specified.\n'
  80. if source_orient not in '+-':
  81. raise 'Source orientation should be either "+" or "-".\n'
  82. if sink_orient not in '+-':
  83. raise 'Sink orientation should be either "+" or "-".\n'
  84. if source_start < 0 or source_end < 0:
  85. raise 'Source coordinates should be >= 0.\n'
  86. if sink_start < 0 or sink_end < 0:
  87. raise 'Sink coordinates should be >= 0.\n'
  88. if len(cigar) == 0:
  89. raise 'Cigar string should not be empty. Use "*" instead.\n'
  90. if source_end < source_start:
  91. sys.stderr.write('ERROR with: source = %s, source_start = %s, source_end = %s, sink = %s, sink_start = %s, sink_end = %s\n' % (source, source_start, source_end, sink, sink_start, sink_end))
  92. raise 'Source end coordinate should be >= source start coordinate.\n'
  93. if sink_end < sink_start:
  94. raise 'Sink end coordinate should be >= sink start coordinate.\n'
  95. if isinstance(tags, dict) == False:
  96. raise 'The tags object must be a dict.\n'
  97. if isinstance(labels, dict) == False:
  98. raise 'The labels object must be a dict.\n'
  99. self.edges[str((source, sink))] = {
  100. KW_NAME: edge_name,
  101. KW_EDGE_SOURCE: source,
  102. KW_EDGE_SOURCE_ORIENT: source_orient,
  103. KW_EDGE_SINK: sink,
  104. KW_EDGE_SINK_ORIENT: sink_orient,
  105. KW_EDGE_SOURCE_START: source_start,
  106. KW_EDGE_SOURCE_END: source_end,
  107. KW_EDGE_SINK_START: sink_start,
  108. KW_EDGE_SINK_END: sink_end,
  109. KW_EDGE_CIGAR: cigar,
  110. KW_TAGS: tags,
  111. KW_LABELS: labels
  112. }
  113. def add_path(self, path_name, path_nodes, path_cigars, tags={}, labels={}):
  114. """
  115. path_nodes is a list of nodes which should be joined
  116. consecutively in a path.
  117. path_cigars is a list of CIGAR strings describing how the
  118. two neighboring nodes are joined.
  119. len(path_nodes) == len(path_cigars)
  120. """
  121. if len(path_name) == 0:
  122. raise 'Path name should be a non-empty string.\n'
  123. if len(path_nodes) == 0:
  124. raise 'Path nodes should be a non-empty list.\n'
  125. if len(path_cigars) == 0:
  126. raise 'Path cigars should be a non-empty list.\n'
  127. if isinstance(tags, dict) == False:
  128. raise 'The tags object must be a dict.\n'
  129. if isinstance(labels, dict) == False:
  130. raise 'The labels object must be a dict.\n'
  131. if len(path_nodes) != len(path_cigars):
  132. raise 'The path_nodes and path_cigars should have the same length.\n'
  133. self.paths[path_name] = {
  134. KW_NAME: path_name,
  135. KW_PATH_NODES: path_nodes,
  136. KW_PATH_CIGARS: path_cigars,
  137. KW_TAGS: tags,
  138. KW_LABELS: labels
  139. }
  140. def write_gfa_v1(self, fp_out):
  141. # Header
  142. line = '\t'.join([GFA_H_TAG, 'VN:Z:1.0'])
  143. fp_out.write(line + '\n')
  144. # Sequences.
  145. for node_name, node_data in self.nodes.items():
  146. line = '\t'.join([ GFA_S_TAG,
  147. node_data[KW_NAME],
  148. node_data[KW_NODE_SEQ],
  149. 'LN:i:%d' % node_data[KW_NODE_LEN]])
  150. fp_out.write(line + '\n')
  151. for edge, edge_data in self.edges.items():
  152. cigar = edge_data[KW_EDGE_CIGAR] if edge_data[KW_EDGE_CIGAR] != '*' else '%dM' % (abs(edge_data[KW_EDGE_SINK_END] - edge_data[KW_EDGE_SINK_START]))
  153. line = '\t'.join([str(val) for val in
  154. [ GFA_L_TAG,
  155. edge_data[KW_EDGE_SOURCE],
  156. edge_data[KW_EDGE_SOURCE_ORIENT],
  157. edge_data[KW_EDGE_SINK],
  158. edge_data[KW_EDGE_SINK_ORIENT],
  159. cigar
  160. ]
  161. ])
  162. fp_out.write(line + '\n')
  163. for path_name, path_data in self.paths.items():
  164. line = '\t'.join([GFA_P_TAG, path_data[KW_NAME], ','.join(path_data[KW_PATH_NODES]), ','.join(path_data[KW_PATH_CIGARS])])
  165. fp_out.write(line + '\n')
  166. def write_gfa_v2(self, fp_out):
  167. # Header
  168. line = '\t'.join([GFA_H_TAG, 'VN:Z:2.0'])
  169. fp_out.write(line + '\n')
  170. # Sequences.
  171. for node_name, node_data in self.nodes.items():
  172. line = '\t'.join([ GFA_S_TAG,
  173. node_data[KW_NAME],
  174. str(node_data[KW_NODE_LEN]),
  175. node_data[KW_NODE_SEQ]])
  176. fp_out.write(line + '\n')
  177. for edge, edge_data in self.edges.items():
  178. v = edge_data[KW_EDGE_SOURCE]
  179. w = edge_data[KW_EDGE_SINK]
  180. v_len = self.nodes[v][KW_NODE_LEN]
  181. w_len = self.nodes[w][KW_NODE_LEN]
  182. # GFA-2 specifies a special char '$' when a coordinate is the same as the sequence length.
  183. v_start = str(edge_data[KW_EDGE_SOURCE_START]) + ('$' if edge_data[KW_EDGE_SOURCE_START] == v_len else '')
  184. v_end = str(edge_data[KW_EDGE_SOURCE_END]) + ('$' if edge_data[KW_EDGE_SOURCE_END] == v_len else '')
  185. w_start = str(edge_data[KW_EDGE_SINK_START]) + ('$' if edge_data[KW_EDGE_SINK_START] == w_len else '')
  186. w_end = str(edge_data[KW_EDGE_SINK_END]) + ('$' if edge_data[KW_EDGE_SINK_END] == w_len else '')
  187. line = '\t'.join([str(val) for val in
  188. [ GFA2_E_TAG, edge_data[KW_NAME],
  189. edge_data[KW_EDGE_SOURCE] + edge_data[KW_EDGE_SOURCE_ORIENT],
  190. edge_data[KW_EDGE_SINK] + edge_data[KW_EDGE_SINK_ORIENT],
  191. v_start, v_end,
  192. w_start, w_end,
  193. edge_data[KW_EDGE_CIGAR],
  194. ]
  195. ])
  196. fp_out.write(line + '\n')
  197. def serialize_gfa(gfa_graph):
  198. gfa_dict = {}
  199. gfa_dict['nodes'] = gfa_graph.nodes
  200. gfa_dict['edges'] = gfa_graph.edges
  201. gfa_dict['paths'] = gfa_graph.paths
  202. return json.dumps(gfa_dict, separators=(', ', ': '), sort_keys=True)
  203. def deserialize_gfa(fp_in):
  204. gfa_dict = json.load(fp_in)
  205. gfa = GFAGraph()
  206. gfa.nodes = gfa_dict['nodes']
  207. gfa.edges = gfa_dict['edges']
  208. gfa.paths = gfa_dict['paths']
  209. return gfa