|
- import os
- import sys
- import json
- GFA_H_TAG = 'H'
- GFA_S_TAG = 'S'
- GFA_L_TAG = 'L'
- GFA_P_TAG = 'P'
- GFA_ORIENT_FWD = '+'
- GFA_ORIENT_REV = '-'
- GFA_SEQ_UNKNOWN = '*'
- GFA_LINK_CIGAR_UNKNOWN = '*'
- GFA2_E_TAG = 'E'
- KW_NAME = 'name'
- KW_TAGS = 'tags'
- KW_LABELS = 'labels'
- KW_NODE_SEQ = 'seq'
- KW_NODE_LEN = 'len'
- KW_EDGE_SOURCE = 'v'
- KW_EDGE_SOURCE_ORIENT = 'v_orient'
- KW_EDGE_SINK = 'w'
- KW_EDGE_SINK_ORIENT = 'w_orient'
- KW_EDGE_CIGAR = 'cigar'
- KW_EDGE_SOURCE_START = 'v_start'
- KW_EDGE_SOURCE_END = 'v_end'
- KW_EDGE_SINK_START = 'w_start'
- KW_EDGE_SINK_END = 'w_end'
- KW_PATH_NODES = 'nodes'
- KW_PATH_CIGARS = 'cigars'
- """
- GFA-1:
- - H line: line = '\t'.join([GFA_H_TAG, '\tVN:Z:1.0'])
- - S line: line = '\t'.join([GFA_S_TAG, rname, GFA_SEQ_UNKNOWN if (not write_reads) else r.sequence, 'LN:i:%s' % len(r.sequence)])
- - L line: line = '\t'.join([GFA_L_TAG, edge.sg_edge.v_name, edge.sg_edge.v_orient, edge.sg_edge.w_name, edge.sg_edge.w_orient, cig_str])
- - P line: line = '\t'.join([GFA_P_TAG, ctg_name, ','.join(segs), ','.join(segs_cigar)]
- GFA-2:
- - H line: line = '\t'.join([GFA_H_TAG, '\tVN:Z:2.0'])
- - S line: line = '\t'.join([GFA_S_TAG, rname, str(len(r.sequence)), GFA_SEQ_UNKNOWN if (not write_reads) else r.sequence])
- - E line: line = '\t'.join([GFA2_E_TAG, edge_name, source_node, sink_node, source_start, source_end, sink_start, sink_end, cig_str])
- """
- class GFAGraph:
- def __init__(self):
- self.nodes = {}
- self.edges = {}
- self.paths = {}
- """
- Node: {KW_NAME: '01234', KW_NODE_SEQ: 'ACTG', 'len': 4}
- Node: {'name': '56789', KW_NODE_SEQ: 'CAGT', 'len': 4}
- Edge: {KW_NAME: 'edge1', 'source': '01234', 'sink': '56789', 'cigar': '*', 'source_start': 3, 'source_end': 4, 'sink_start': 0, 'sink_end': 1}
- Path: {KW_NAME: '000000F', 'nodes': ['01234', '56789'], '
- """
- def add_node(self, node_name, node_len, node_seq='*', tags={}, labels={}):
- if len(node_name) == 0:
- raise 'Node name should be a non-empty string.\n'
- if node_len < 0:
- raise 'Node length should be >= 0.\n'
- if len(node_seq) == 0:
- raise 'Node sequence should be a non-empty string. Use "*" instead.\n'
- if isinstance(tags, dict) == False:
- raise 'The tags object must be a dict.\n'
- if isinstance(labels, dict) == False:
- raise 'The labels object must be a dict.\n'
- self.nodes[node_name] = {
- KW_NAME: node_name,
- KW_NODE_LEN: node_len,
- KW_NODE_SEQ: node_seq,
- KW_TAGS: tags,
- KW_LABELS: labels
- }
- def add_edge(self, edge_name, source, source_orient, sink, sink_orient, source_start, source_end, sink_start, sink_end, cigar, tags={}, labels={}):
- """
- source_orient + if fwd, - otherwise.
- sink_orient + if fwd, - otherwise.
- """
- if len(edge_name) == 0:
- raise 'Edge name should be a non-empty string.\n'
- if len(source) == 0:
- raise 'Source node not specified.\n'
- if len(sink) == 0:
- raise 'Sink node not specified.\n'
- if source_orient not in '+-':
- raise 'Source orientation should be either "+" or "-".\n'
- if sink_orient not in '+-':
- raise 'Sink orientation should be either "+" or "-".\n'
- if source_start < 0 or source_end < 0:
- raise 'Source coordinates should be >= 0.\n'
- if sink_start < 0 or sink_end < 0:
- raise 'Sink coordinates should be >= 0.\n'
- if len(cigar) == 0:
- raise 'Cigar string should not be empty. Use "*" instead.\n'
- if source_end < source_start:
- sys.stderr.write('ERROR with: source = %s, source_start = %s, source_end = %s, sink = %s, sink_start = %s, sink_end = %s\n' % (source, source_start, source_end, sink, sink_start, sink_end))
- raise 'Source end coordinate should be >= source start coordinate.\n'
- if sink_end < sink_start:
- raise 'Sink end coordinate should be >= sink start coordinate.\n'
- if isinstance(tags, dict) == False:
- raise 'The tags object must be a dict.\n'
- if isinstance(labels, dict) == False:
- raise 'The labels object must be a dict.\n'
- self.edges[str((source, sink))] = {
- KW_NAME: edge_name,
- KW_EDGE_SOURCE: source,
- KW_EDGE_SOURCE_ORIENT: source_orient,
- KW_EDGE_SINK: sink,
- KW_EDGE_SINK_ORIENT: sink_orient,
- KW_EDGE_SOURCE_START: source_start,
- KW_EDGE_SOURCE_END: source_end,
- KW_EDGE_SINK_START: sink_start,
- KW_EDGE_SINK_END: sink_end,
- KW_EDGE_CIGAR: cigar,
- KW_TAGS: tags,
- KW_LABELS: labels
- }
- def add_path(self, path_name, path_nodes, path_cigars, tags={}, labels={}):
- """
- path_nodes is a list of nodes which should be joined
- consecutively in a path.
- path_cigars is a list of CIGAR strings describing how the
- two neighboring nodes are joined.
- len(path_nodes) == len(path_cigars)
- """
- if len(path_name) == 0:
- raise 'Path name should be a non-empty string.\n'
- if len(path_nodes) == 0:
- raise 'Path nodes should be a non-empty list.\n'
- if len(path_cigars) == 0:
- raise 'Path cigars should be a non-empty list.\n'
- if isinstance(tags, dict) == False:
- raise 'The tags object must be a dict.\n'
- if isinstance(labels, dict) == False:
- raise 'The labels object must be a dict.\n'
- if len(path_nodes) != len(path_cigars):
- raise 'The path_nodes and path_cigars should have the same length.\n'
- self.paths[path_name] = {
- KW_NAME: path_name,
- KW_PATH_NODES: path_nodes,
- KW_PATH_CIGARS: path_cigars,
- KW_TAGS: tags,
- KW_LABELS: labels
- }
- def write_gfa_v1(self, fp_out):
- # Header
- line = '\t'.join([GFA_H_TAG, 'VN:Z:1.0'])
- fp_out.write(line + '\n')
- # Sequences.
- for node_name, node_data in self.nodes.items():
- line = '\t'.join([ GFA_S_TAG,
- node_data[KW_NAME],
- node_data[KW_NODE_SEQ],
- 'LN:i:%d' % node_data[KW_NODE_LEN]])
- fp_out.write(line + '\n')
- for edge, edge_data in self.edges.items():
- cigar = edge_data[KW_EDGE_CIGAR] if edge_data[KW_EDGE_CIGAR] != '*' else '%dM' % (abs(edge_data[KW_EDGE_SINK_END] - edge_data[KW_EDGE_SINK_START]))
- line = '\t'.join([str(val) for val in
- [ GFA_L_TAG,
- edge_data[KW_EDGE_SOURCE],
- edge_data[KW_EDGE_SOURCE_ORIENT],
- edge_data[KW_EDGE_SINK],
- edge_data[KW_EDGE_SINK_ORIENT],
- cigar
- ]
- ])
- fp_out.write(line + '\n')
- for path_name, path_data in self.paths.items():
- line = '\t'.join([GFA_P_TAG, path_data[KW_NAME], ','.join(path_data[KW_PATH_NODES]), ','.join(path_data[KW_PATH_CIGARS])])
- fp_out.write(line + '\n')
- def write_gfa_v2(self, fp_out):
- # Header
- line = '\t'.join([GFA_H_TAG, 'VN:Z:2.0'])
- fp_out.write(line + '\n')
- # Sequences.
- for node_name, node_data in self.nodes.items():
- line = '\t'.join([ GFA_S_TAG,
- node_data[KW_NAME],
- str(node_data[KW_NODE_LEN]),
- node_data[KW_NODE_SEQ]])
- fp_out.write(line + '\n')
- for edge, edge_data in self.edges.items():
- v = edge_data[KW_EDGE_SOURCE]
- w = edge_data[KW_EDGE_SINK]
- v_len = self.nodes[v][KW_NODE_LEN]
- w_len = self.nodes[w][KW_NODE_LEN]
- # GFA-2 specifies a special char '$' when a coordinate is the same as the sequence length.
- v_start = str(edge_data[KW_EDGE_SOURCE_START]) + ('$' if edge_data[KW_EDGE_SOURCE_START] == v_len else '')
- v_end = str(edge_data[KW_EDGE_SOURCE_END]) + ('$' if edge_data[KW_EDGE_SOURCE_END] == v_len else '')
- w_start = str(edge_data[KW_EDGE_SINK_START]) + ('$' if edge_data[KW_EDGE_SINK_START] == w_len else '')
- w_end = str(edge_data[KW_EDGE_SINK_END]) + ('$' if edge_data[KW_EDGE_SINK_END] == w_len else '')
- line = '\t'.join([str(val) for val in
- [ GFA2_E_TAG, edge_data[KW_NAME],
- edge_data[KW_EDGE_SOURCE] + edge_data[KW_EDGE_SOURCE_ORIENT],
- edge_data[KW_EDGE_SINK] + edge_data[KW_EDGE_SINK_ORIENT],
- v_start, v_end,
- w_start, w_end,
- edge_data[KW_EDGE_CIGAR],
- ]
- ])
- fp_out.write(line + '\n')
- def serialize_gfa(gfa_graph):
- gfa_dict = {}
- gfa_dict['nodes'] = gfa_graph.nodes
- gfa_dict['edges'] = gfa_graph.edges
- gfa_dict['paths'] = gfa_graph.paths
- return json.dumps(gfa_dict, separators=(', ', ': '), sort_keys=True)
- def deserialize_gfa(fp_in):
- gfa_dict = json.load(fp_in)
- gfa = GFAGraph()
- gfa.nodes = gfa_dict['nodes']
- gfa.edges = gfa_dict['edges']
- gfa.paths = gfa_dict['paths']
- return gfa
|