123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199 |
- from pypeflow.io import (
- syscall, capture, cd,
- mkdirs, symlink, rm, touch, filesize, exists_and_not_empty) # needed?
- import contextlib
- import io
- import logging
- import os
- import pprint
- import sys
- if sys.version_info >= (3, 0):
- NativeIO = io.StringIO
- else:
- NativeIO = io.BytesIO
- LOG = logging.getLogger()
- def log(*msgs):
- LOG.debug(' '.join(repr(m) for m in msgs))
- def eng(number):
- return '{:.1f}MB'.format(number / 2**20)
- class Percenter(object):
- """Report progress by golden exponential.
- Usage:
- counter = Percenter('mystruct', total_len(mystruct))
- for rec in mystruct:
- counter(len(rec))
- """
- def __init__(self, name, total, log=LOG.info, units='units'):
- if sys.maxsize == total:
- log('Counting {} from "{}"'.format(units, name))
- else:
- log('Counting {:,d} {} from\n "{}"'.format(total, units, name))
- self.total = total
- self.log = log
- self.name = name
- self.units = units
- self.call = 0
- self.count = 0
- self.next_count = 0
- self.a = 1 # double each time
- def __call__(self, more, label=''):
- self.call += 1
- self.count += more
- if self.next_count <= self.count:
- self.a = 2 * self.a
- self.a = max(self.a, more)
- self.a = min(self.a, (self.total-self.count), round(self.total/10.0))
- self.next_count = self.count + self.a
- if self.total == sys.maxsize:
- msg = '{:>10} count={:15,d} {}'.format(
- '#{:,d}'.format(self.call), self.count, label)
- else:
- msg = '{:>10} count={:15,d} {:6.02f}% {}'.format(
- '#{:,d}'.format(self.call), self.count, 100.0*self.count/self.total, label)
- self.log(msg)
- def finish(self):
- self.log('Counted {:,d} {} in {} calls from:\n "{}"'.format(
- self.count, self.units, self.call, self.name))
- def FilePercenter(fn, log=LOG.info):
- if '-' == fn or not fn:
- size = sys.maxsize
- else:
- size = filesize(fn)
- if fn.endswith('.dexta'):
- size = size * 4
- elif fn.endswith('.gz'):
- size = sys.maxsize # probably 2.8x to 3.2x, but we are not sure, and higher is better than lower
- # https://stackoverflow.com/a/22348071
- # https://jira.pacificbiosciences.com/browse/TAG-2836
- return Percenter(fn, size, log, units='bytes')
- @contextlib.contextmanager
- def open_progress(fn, mode='r', log=LOG.info):
- """
- Usage:
- with open_progress('foo', log=LOG.info) as stream:
- for line in stream:
- use(line)
- That will log progress lines.
- """
- def get_iter(stream, progress):
- for line in stream:
- progress(len(line))
- yield line
- fp = FilePercenter(fn, log=log)
- with open(fn, mode=mode) as stream:
- yield get_iter(stream, fp)
- fp.finish()
- def read_as_msgpack(bytestream):
- import msgpack
- content = bytestream.read()
- log(' Read {} as msgpack'.format(eng(len(content))))
- return msgpack.unpackb(content, raw=False,
- max_map_len=2**25,
- max_array_len=2**25,
- )
- def read_as_json(bytestream):
- import json
- content = bytestream.read().decode('ascii')
- log(' Read {} as json'.format(eng(len(content))))
- return json.loads(content)
- def write_as_msgpack(bytestream, val):
- import msgpack
- content = msgpack.packb(val)
- log(' Serialized to {} as msgpack'.format(eng(len(content))))
- bytestream.write(content)
- def write_as_json(bytestream, val):
- import json
- content = json.dumps(val, indent=2, separators=(',', ': ')).encode('ascii')
- log(' Serialized to {} as json'.format(eng(len(content))))
- bytestream.write(content)
- def deserialize(fn):
- log('Deserializing from {!r}'.format(fn))
- with open(fn, 'rb') as ifs:
- log(' Opened for read: {!r}'.format(fn))
- if fn.endswith('.msgpack'):
- val = read_as_msgpack(ifs)
- elif fn.endswith('.json'):
- val = read_as_json(ifs)
- else:
- raise Exception('Unknown extension for {!r}'.format(fn))
- log(' Deserialized {} records'.format(len(val)))
- return val
- def serialize(fn, val):
- """Assume dirname exists.
- """
- log('Serializing {} records'.format(len(val)))
- mkdirs(os.path.dirname(fn))
- with open(fn, 'wb') as ofs:
- log(' Opened for write: {!r}'.format(fn))
- if fn.endswith('.msgpack'):
- write_as_msgpack(ofs, val)
- elif fn.endswith('.json'):
- write_as_json(ofs, val)
- ofs.write(b'\n') # for vim
- else:
- raise Exception('Unknown extension for {!r}'.format(fn))
- def yield_abspath_from_fofn(fofn_fn):
- """Yield each filename.
- Relative paths are resolved from the FOFN directory.
- 'fofn_fn' can be .fofn, .json, .msgpack
- """
- try:
- fns = deserialize(fofn_fn)
- except:
- #LOG('las fofn {!r} does not seem to be JSON; try to switch, so we can detect truncated files.'.format(fofn_fn))
- fns = open(fofn_fn).read().strip().split()
- try:
- basedir = os.path.dirname(fofn_fn)
- for fn in fns:
- if not os.path.isabs(fn):
- fn = os.path.abspath(os.path.join(basedir, fn))
- yield fn
- except Exception:
- LOG.error('Problem resolving paths in FOFN {!r}'.format(fofn_fn))
- raise
- def rmdirs(*dirnames):
- for d in dirnames:
- assert os.path.normpath(d.strip()) not in ['.', '', '/']
- syscall('rm -rf {}'.format(' '.join(dirnames)))
- def rmdir(d):
- rmdirs(d)
- def rm_force(*fns):
- for fn in fns:
- if os.path.exists(fn):
- os.unlink(fn)
|