io.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273
  1. """I/O utilities
  2. Not specific to FALCON.
  3. """
  4. #from builtins import str
  5. from builtins import object
  6. import contextlib
  7. import os
  8. import resource
  9. import shlex
  10. import shutil
  11. import subprocess as sp
  12. import sys
  13. import tempfile
  14. import traceback
  15. from ..io import deserialize
  16. def write_nothing(*args):
  17. """
  18. To use,
  19. LOG = noop
  20. """
  21. def write_with_pid(*args):
  22. msg = '[%d]%s\n' % (os.getpid(), ' '.join(args))
  23. sys.stderr.write(msg)
  24. LOG = write_with_pid
  25. def logstats():
  26. """This is useful 'atexit'.
  27. """
  28. LOG('maxrss:%9d' % (resource.getrusage(resource.RUSAGE_SELF).ru_maxrss))
  29. def reprarg(arg):
  30. """Reduce the size of repr()
  31. """
  32. if isinstance(arg, str):
  33. if len(arg) > 100:
  34. return '{}...({})'.format(arg[:100], len(arg))
  35. elif (isinstance(arg, set) or isinstance(arg, list)
  36. or isinstance(arg, tuple) or isinstance(arg, dict)):
  37. if len(arg) > 9:
  38. return '%s(%d elem)' % (type(arg).__name__, len(arg))
  39. else:
  40. return '<' + ', '.join(reprarg(a) for a in arg) + '>'
  41. return repr(arg)
  42. def run_func(args):
  43. """Wrap multiprocessing.Pool calls.
  44. Usage:
  45. pool.imap(run_func, [func, arg0, arg1, ...])
  46. """
  47. func = args[0]
  48. try:
  49. func_name = func.__name__
  50. except:
  51. # but since it must be pickle-able, this should never happen.
  52. func_name = repr(func)
  53. args = args[1:]
  54. try:
  55. LOG('starting %s(%s)' % (func_name, ', '.join(reprarg(a) for a in args)))
  56. logstats()
  57. ret = func(*args)
  58. logstats()
  59. LOG('finished %s(%s)' % (func_name, ', '.join(reprarg(a) for a in args)))
  60. return ret
  61. except Exception:
  62. raise Exception(traceback.format_exc())
  63. except: # KeyboardInterrupt, SystemExit
  64. LOG('interrupted %s(%s)' %
  65. (func_name, ', '.join(reprarg(a) for a in args)))
  66. return
  67. def system(call, check=False):
  68. LOG('$(%s)' % repr(call))
  69. rc = os.system(call)
  70. msg = "Call %r returned %d." % (call, rc)
  71. if rc:
  72. LOG("WARNING: " + msg)
  73. if check:
  74. raise Exception(msg)
  75. else:
  76. LOG(msg)
  77. return rc
  78. def syscall(cmd):
  79. """Return stdout, fully captured.
  80. Wait for subproc to finish.
  81. Raise if empty.
  82. Raise on non-zero exit-code.
  83. """
  84. LOG('$ {!r} >'.format(cmd))
  85. output = sp.check_output(shlex.split(cmd), encoding='ascii')
  86. if not output:
  87. msg = '%r failed to produce any output.' % cmd
  88. LOG('WARNING: %s' % msg)
  89. return output
  90. def slurplines(cmd):
  91. return syscall(cmd).splitlines()
  92. def streamlines(cmd):
  93. """Stream stdout from cmd.
  94. Let stderr fall through.
  95. The returned reader will stop yielding when the subproc exits.
  96. Note: We do not detect a failure in the underlying process.
  97. """
  98. LOG('$ %s |' % cmd)
  99. proc = sp.Popen(shlex.split(cmd), stdout=sp.PIPE, encoding='ascii')
  100. return proc.stdout
  101. class DataReaderContext(object):
  102. def readlines(self):
  103. output = self.data.strip()
  104. for line in output.splitlines():
  105. yield line
  106. def __enter__(self):
  107. pass
  108. def __exit__(self, *args):
  109. self.returncode = 0
  110. def __init__(self, data):
  111. self.data = data
  112. class ProcessReaderContext(object):
  113. """Prefer this to slurplines() or streamlines().
  114. """
  115. def readlines(self):
  116. """Generate lines of native str.
  117. """
  118. # In py2, not unicode.
  119. raise NotImplementedError()
  120. def __enter__(self):
  121. LOG('{!r}'.format(self.cmd))
  122. self.proc = sp.Popen(shlex.split(self.cmd), stdout=sp.PIPE, universal_newlines=True, encoding='ascii')
  123. def __exit__(self, etype, evalue, etb):
  124. if etype is None:
  125. self.proc.wait()
  126. else:
  127. # Exception was raised in "with-block".
  128. # We cannot wait on proc b/c it might never finish!
  129. pass
  130. self.returncode = self.proc.returncode
  131. if self.returncode:
  132. msg = "%r <- %r" % (self.returncode, self.cmd)
  133. raise Exception(msg)
  134. del self.proc
  135. def __init__(self, cmd):
  136. self.cmd = cmd
  137. def splitlines_iter(text):
  138. """This is the same as splitlines, but with a generator.
  139. """
  140. # https://stackoverflow.com/questions/3054604/iterate-over-the-lines-of-a-string
  141. assert isinstance(text, str)
  142. prevnl = -1
  143. while True:
  144. nextnl = text.find('\n', prevnl + 1) # u'\n' would force unicode
  145. if nextnl < 0:
  146. break
  147. yield text[prevnl + 1:nextnl]
  148. prevnl = nextnl
  149. if (prevnl + 1) != len(text):
  150. yield text[prevnl + 1:]
  151. class CapturedProcessReaderContext(ProcessReaderContext):
  152. def readlines(self):
  153. """Usage:
  154. cmd = 'ls -l'
  155. reader = CapturedProcessReaderContext(cmd)
  156. with reader:
  157. for line in reader.readlines():
  158. print line
  159. Any exception within the 'with-block' is propagated.
  160. Otherwise, after all lines are read, if 'cmd' failed, Exception is raised.
  161. """
  162. output, _ = self.proc.communicate()
  163. # Process has terminated by now, so we can iterate without keeping it alive.
  164. #for line in splitlines_iter(str(output, 'utf-8')):
  165. for line in splitlines_iter(output):
  166. yield line
  167. class StreamedProcessReaderContext(ProcessReaderContext):
  168. def readlines(self):
  169. """Usage:
  170. cmd = 'ls -l'
  171. reader = StreamedProcessReaderContext(cmd)
  172. with reader:
  173. for line in reader.readlines():
  174. print line
  175. Any exception within the 'with-block' is propagated.
  176. Otherwise, after all lines are read, if 'cmd' failed, Exception is raised.
  177. """
  178. for line in self.proc.stdout:
  179. # We expect unicode from py3 but raw-str from py2, given
  180. # universal_newlines=True.
  181. # Based on source-code in 'future/types/newstr.py',
  182. # it seems that str(str(x)) has no extra penalty,
  183. # and it should not crash either. Anyway,
  184. # our tests would catch it.
  185. #yield str(line, 'utf-8').rstrip()
  186. yield line.rstrip()
  187. def filesize(fn):
  188. """In bytes.
  189. Raise if fn does not exist.
  190. """
  191. statinfo = os.stat(fn)
  192. return statinfo.st_size
  193. def validated_fns(fofn):
  194. return list(yield_validated_fns(fofn))
  195. def yield_validated_fns(fofn):
  196. """Return list of filenames from fofn, either abs or relative to CWD instead of dir of fofn.
  197. Assert none are empty/non-existent.
  198. """
  199. dirname = os.path.normpath(os.path.dirname(os.path.realpath(fofn))) # normpath makes '' become '.'
  200. try:
  201. fns = deserialize(fofn)
  202. except:
  203. #LOG('las fofn {!r} does not seem to be JSON or msgpack; try to switch, so we can detect truncated files.'.format(fofn))
  204. fns = open(fofn).read().strip().split()
  205. try:
  206. for fn in fns:
  207. assert fn
  208. if not os.path.isabs(fn):
  209. fn = os.path.normpath(os.path.relpath(os.path.join(dirname, fn)))
  210. assert os.path.isfile(fn), 'File {!r} is not a file.'.format(fn)
  211. assert filesize(fn), '{!r} has size {}'.format(fn, filesize(fn))
  212. yield fn
  213. except Exception:
  214. sys.stderr.write('Failed to validate FOFN {!r}\n'.format(fofn))
  215. raise
  216. @contextlib.contextmanager
  217. def TemporaryDirectory():
  218. name = tempfile.mkdtemp()
  219. LOG('TemporaryDirectory={!r}'.format(name))
  220. try:
  221. yield name
  222. finally:
  223. shutil.rmtree(name)