io.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. from pypeflow.io import (
  2. syscall, capture, cd,
  3. mkdirs, symlink, rm, touch, filesize, exists_and_not_empty) # needed?
  4. import contextlib
  5. import io
  6. import logging
  7. import os
  8. import pprint
  9. import sys
  10. if sys.version_info >= (3, 0):
  11. NativeIO = io.StringIO
  12. else:
  13. NativeIO = io.BytesIO
  14. LOG = logging.getLogger()
  15. def log(*msgs):
  16. LOG.debug(' '.join(repr(m) for m in msgs))
  17. def eng(number):
  18. return '{:.1f}MB'.format(number / 2**20)
  19. class Percenter(object):
  20. """Report progress by golden exponential.
  21. Usage:
  22. counter = Percenter('mystruct', total_len(mystruct))
  23. for rec in mystruct:
  24. counter(len(rec))
  25. """
  26. def __init__(self, name, total, log=LOG.info, units='units'):
  27. if sys.maxsize == total:
  28. log('Counting {} from "{}"'.format(units, name))
  29. else:
  30. log('Counting {:,d} {} from\n "{}"'.format(total, units, name))
  31. self.total = total
  32. self.log = log
  33. self.name = name
  34. self.units = units
  35. self.call = 0
  36. self.count = 0
  37. self.next_count = 0
  38. self.a = 1 # double each time
  39. def __call__(self, more, label=''):
  40. self.call += 1
  41. self.count += more
  42. if self.next_count <= self.count:
  43. self.a = 2 * self.a
  44. self.a = max(self.a, more)
  45. self.a = min(self.a, (self.total-self.count), round(self.total/10.0))
  46. self.next_count = self.count + self.a
  47. if self.total == sys.maxsize:
  48. msg = '{:>10} count={:15,d} {}'.format(
  49. '#{:,d}'.format(self.call), self.count, label)
  50. else:
  51. msg = '{:>10} count={:15,d} {:6.02f}% {}'.format(
  52. '#{:,d}'.format(self.call), self.count, 100.0*self.count/self.total, label)
  53. self.log(msg)
  54. def finish(self):
  55. self.log('Counted {:,d} {} in {} calls from:\n "{}"'.format(
  56. self.count, self.units, self.call, self.name))
  57. def FilePercenter(fn, log=LOG.info):
  58. if '-' == fn or not fn:
  59. size = sys.maxsize
  60. else:
  61. size = filesize(fn)
  62. if fn.endswith('.dexta'):
  63. size = size * 4
  64. elif fn.endswith('.gz'):
  65. size = sys.maxsize # probably 2.8x to 3.2x, but we are not sure, and higher is better than lower
  66. # https://stackoverflow.com/a/22348071
  67. # https://jira.pacificbiosciences.com/browse/TAG-2836
  68. return Percenter(fn, size, log, units='bytes')
  69. @contextlib.contextmanager
  70. def open_progress(fn, mode='r', log=LOG.info):
  71. """
  72. Usage:
  73. with open_progress('foo', log=LOG.info) as stream:
  74. for line in stream:
  75. use(line)
  76. That will log progress lines.
  77. """
  78. def get_iter(stream, progress):
  79. for line in stream:
  80. progress(len(line))
  81. yield line
  82. fp = FilePercenter(fn, log=log)
  83. with open(fn, mode=mode) as stream:
  84. yield get_iter(stream, fp)
  85. fp.finish()
  86. def read_as_msgpack(bytestream):
  87. import msgpack
  88. content = bytestream.read()
  89. log(' Read {} as msgpack'.format(eng(len(content))))
  90. return msgpack.unpackb(content, raw=False,
  91. max_map_len=2**25,
  92. max_array_len=2**25,
  93. )
  94. def read_as_json(bytestream):
  95. import json
  96. content = bytestream.read().decode('ascii')
  97. log(' Read {} as json'.format(eng(len(content))))
  98. return json.loads(content)
  99. def write_as_msgpack(bytestream, val):
  100. import msgpack
  101. content = msgpack.packb(val)
  102. log(' Serialized to {} as msgpack'.format(eng(len(content))))
  103. bytestream.write(content)
  104. def write_as_json(bytestream, val):
  105. import json
  106. content = json.dumps(val, indent=2, separators=(',', ': ')).encode('ascii')
  107. log(' Serialized to {} as json'.format(eng(len(content))))
  108. bytestream.write(content)
  109. def deserialize(fn):
  110. log('Deserializing from {!r}'.format(fn))
  111. with open(fn, 'rb') as ifs:
  112. log(' Opened for read: {!r}'.format(fn))
  113. if fn.endswith('.msgpack'):
  114. val = read_as_msgpack(ifs)
  115. elif fn.endswith('.json'):
  116. val = read_as_json(ifs)
  117. else:
  118. raise Exception('Unknown extension for {!r}'.format(fn))
  119. log(' Deserialized {} records'.format(len(val)))
  120. return val
  121. def serialize(fn, val):
  122. """Assume dirname exists.
  123. """
  124. log('Serializing {} records'.format(len(val)))
  125. mkdirs(os.path.dirname(fn))
  126. with open(fn, 'wb') as ofs:
  127. log(' Opened for write: {!r}'.format(fn))
  128. if fn.endswith('.msgpack'):
  129. write_as_msgpack(ofs, val)
  130. elif fn.endswith('.json'):
  131. write_as_json(ofs, val)
  132. ofs.write(b'\n') # for vim
  133. else:
  134. raise Exception('Unknown extension for {!r}'.format(fn))
  135. def yield_abspath_from_fofn(fofn_fn):
  136. """Yield each filename.
  137. Relative paths are resolved from the FOFN directory.
  138. 'fofn_fn' can be .fofn, .json, .msgpack
  139. """
  140. try:
  141. fns = deserialize(fofn_fn)
  142. except:
  143. #LOG('las fofn {!r} does not seem to be JSON; try to switch, so we can detect truncated files.'.format(fofn_fn))
  144. fns = open(fofn_fn).read().strip().split()
  145. try:
  146. basedir = os.path.dirname(fofn_fn)
  147. for fn in fns:
  148. if not os.path.isabs(fn):
  149. fn = os.path.abspath(os.path.join(basedir, fn))
  150. yield fn
  151. except Exception:
  152. LOG.error('Problem resolving paths in FOFN {!r}'.format(fofn_fn))
  153. raise
  154. def rmdirs(*dirnames):
  155. for d in dirnames:
  156. assert os.path.normpath(d.strip()) not in ['.', '', '/']
  157. syscall('rm -rf {}'.format(' '.join(dirnames)))
  158. def rmdir(d):
  159. rmdirs(d)
  160. def rm_force(*fns):
  161. for fn in fns:
  162. if os.path.exists(fn):
  163. os.unlink(fn)