123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279 |
- from builtins import next
- from builtins import range
- from builtins import object
- from os.path import abspath, expanduser
- from .io import NativeIO as StringIO
- from .io import FilePercenter
- import contextlib
- import gzip
- import hashlib
- import logging
- import os
- import re
- import subprocess
- import sys
- import warnings
- LOG = logging.getLogger(__name__)
- ##
- # Utility functions for FastaReader
- ##
- def wrap(s, columns):
- return "\n".join(s[start:start + columns]
- for start in range(0, len(s), columns))
- def splitFastaHeader(name):
- """
- Split a FASTA/FASTQ header into its id and metadata components
- >>> splitFastaHeader('>m54329_180926_230856/34669168/0_42 FOO=BAR X=Y')
- ('>m54329_180926_230856/34669168/0_42', 'FOO=BAR X=Y')
- """
- nameParts = re.split(r'\s', name, maxsplit=1)
- id_ = nameParts[0]
- if len(nameParts) > 1:
- metadata = nameParts[1].strip()
- else:
- metadata = None
- return (id_, metadata)
- def splitFileContents(f, delimiter, BLOCKSIZE=8192):
- """
- Same semantics as f.read().split(delimiter), but with memory usage
- determined by largest chunk rather than entire file size
- """
- remainder = StringIO()
- while True:
- block = f.read(BLOCKSIZE)
- if not block:
- break
- parts = block.split(delimiter)
- remainder.write(parts[0])
- for part in parts[1:]:
- yield remainder.getvalue()
- remainder = StringIO()
- remainder.write(part)
- yield remainder.getvalue()
- class FastaRecord(object):
- """
- A FastaRecord object models a named sequence in a FASTA file.
- """
- DELIMITER = ">"
- COLUMNS = 60
- def __init__(self, name, sequence):
- try:
- assert "\n" not in name
- assert "\n" not in sequence
- assert self.DELIMITER not in sequence
- self._name = name
- self._sequence = sequence
- self._md5 = hashlib.md5(self._sequence.encode('ascii')).hexdigest()
- self._id, self._metadata = splitFastaHeader(name)
- except AssertionError:
- raise ValueError("Invalid FASTA record data")
- @property
- def name(self):
- """
- The name of the sequence in the FASTA file, equal to the entire
- FASTA header following the '>' character
- """
- return self._name
- @property
- def id(self):
- """
- The id of the sequence in the FASTA file, equal to the FASTA header
- up to the first whitespace.
- """
- return self._id
- @property
- def metadata(self):
- """
- The metadata associated with the sequence in the FASTA file, equal to
- the contents of the FASTA header following the first whitespace
- """
- return self._metadata
- @property
- def sequence(self):
- """
- The sequence for the record as present in the FASTA file.
- (Newlines are removed but otherwise no sequence normalization
- is performed).
- """
- return self._sequence
- @property
- def length(self):
- """
- Get the length of the FASTA sequence
- """
- return len(self._sequence)
- @property
- def md5(self):
- """
- The MD5 checksum (hex digest) of `sequence`
- """
- return self._md5
- @classmethod
- def fromString(cls, s):
- """
- Interprets a string as a FASTA record. Does not make any
- assumptions about wrapping of the sequence string.
- """
- try:
- lines = s.splitlines()
- assert len(lines) > 1
- assert lines[0][0] == cls.DELIMITER
- name = lines[0][1:]
- sequence = "".join(lines[1:])
- return FastaRecord(name, sequence)
- except AssertionError:
- raise ValueError("String not recognized as a valid FASTA record")
- def __eq__(self, other):
- if isinstance(other, self.__class__):
- return (self.name == other.name and
- self._sequence == other._sequence)
- else:
- return False
- def __ne__(self, other):
- return not self.__eq__(other)
- def __str__(self):
- """
- Output a string representation of this FASTA record, observing
- standard conventions about sequence wrapping.
- """
- return (">%s\n" % self.name) + \
- wrap(self._sequence, self.COLUMNS)
- # These are refactored from ReaderBase/FastaReader.
- def yield_fasta_record(f, fn=None, log=LOG.info):
- """
- f: fileobj
- fn: str - filename (for exceptions); inferred from f.name if not provided
- """
- if not fn and f is not sys.stdin:
- fn = getattr(f, 'name', None)
- if fn is not None and not os.path.exists(fn):
- log('Not sure what to do with FASTA file name="{}"'.format(fn))
- fn = ''
- counter = FilePercenter(fn, log=log)
- try:
- parts = splitFileContents(f, ">")
- assert "" == next(parts)
- for part in parts:
- counter(len(part))
- yield FastaRecord.fromString(">" + part)
- except AssertionError:
- raise Exception("Invalid FASTA file {!r}".format(fn))
- def yield_fasta_records(f, fn=None, log=LOG.info):
- warnings.warn('use yield_fasta_record() instead', DeprecationWarning)
- return yield_fasta_record(f, fn, log)
- def stream_stdout(call, fn):
- args = call.split()
- proc = subprocess.Popen(args, stdin=open(fn), stdout=subprocess.PIPE)
- return proc.stdout
- @contextlib.contextmanager
- def open_fasta_reader(fn, log=LOG.info):
- """
- fn: str - filename
- Note: If you already have a fileobj, you can iterate over yield_fasta_record() directly.
- Streaming reader for FASTA files, useable as a one-shot iterator
- over FastaRecord objects. Agnostic about line wrapping.
- Example:
- .. doctest::
- TODO: Get data.
- > from pbcore import data
- > filename = data.getTinyFasta()
- > r = FastaReader(filename)
- > with open_fasta_reader(filename) as r:
- ... for record in r:
- ... print record.name, len(record.sequence), record.md5
- ref000001|EGFR_Exon_2 183 e3912e9ceacd6538ede8c1b2adda7423
- ref000002|EGFR_Exon_3 203 4bf218da37175a91869033024ac8f9e9
- ref000003|EGFR_Exon_4 215 245bc7a046aad0788c22b071ed210f4d
- ref000004|EGFR_Exon_5 157 c368b8191164a9d6ab76fd328e2803ca
- """
- filename = abspath(expanduser(fn))
- mode = 'r'
- if filename.endswith(".gz"):
- ofs = gzip.open(filename, mode)
- elif filename.endswith(".dexta"):
- ofs = stream_stdout("undexta -vkU -w60 -i", filename)
- elif '-' == fn:
- ofs = sys.stdin
- filename = fn
- else:
- ofs = open(filename, mode)
- yield yield_fasta_record(ofs, filename, log=log)
- ofs.close()
- @contextlib.contextmanager
- def open_fasta_writer(fn, log=LOG.info):
- """
- fn: str - filename
- Yield a writer, possibly compressing. This is not actually
- fasta-specific, except it can also write Gene Myers "dexta".
- Wraps as the default COLUMNS=60.
- Example:
- with open_fasta_reader(ifn) as rin:
- with open_fasta_writer(ofn) as writer
- for record in rin:
- writer.write(str(record))
- """
- filename = abspath(expanduser(fn))
- if filename.endswith(".gz"):
- ofs = gzip.open(filename, 'wb')
- elif filename.endswith(".dexta"):
- ofs = stream_stdout("dexta -vk -i", filename)
- elif '-' == fn:
- ofs = sys.stdout
- filename = fn
- else:
- ofs = open(filename, 'w')
- yield ofs
- ofs.close()
- class FastaReader(object):
- """Deprecated, but should still work (with filenames).
- """
- def __iter__(self):
- with open_fasta_reader(self.filename, log=self.log) as reader:
- for rec in reader:
- yield rec
- def __init__(self, f, log=LOG.info):
- self.filename = f
- self.log = log
|