generic_scatter_uows_tar.py 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. import argparse
  2. import collections
  3. import glob
  4. import logging
  5. import os
  6. import sys
  7. from .. import io
  8. LOG = logging.getLogger()
  9. def yield_uows(n, all_uows):
  10. """
  11. >>> list(yield_uows(2, [0,1,2,3,4]))
  12. [[0, 1, 2], [3, 4]]
  13. """
  14. # yield exactly n sublists.
  15. total = len(all_uows)
  16. remaining = total
  17. it = iter(all_uows)
  18. while n and remaining:
  19. taken = (remaining + n - 1) // n
  20. to_yield = [next(it) for _ in range(taken)]
  21. yield to_yield
  22. remaining -= taken
  23. n -= 1
  24. def move_into_tar(dn, fns):
  25. # Create directory 'dn'.
  26. # Move files (or dir-trees) into directory 'dn', and tar it.
  27. # By convention, for tar-file "foo.tar", we first move everything into a directory named "foo".
  28. io.mkdirs(dn)
  29. for fn in fns:
  30. cmd = 'mv {} {}'.format(fn, dn)
  31. io.syscall(cmd)
  32. tar_fn = '{}.tar'.format(dn)
  33. #with tarfile.TarFile(tar_fn, 'w', dereference=False, ignore_zeros=True, errorlevel=2) as tf:
  34. # tf.add(dn)
  35. cmd = 'tar cvf {} {}'.format(tar_fn, dn)
  36. io.syscall(cmd)
  37. io.rmdirs(dn)
  38. def dir_from_tar(tar_fn):
  39. return os.path.splitext(os.path.basename(tar_fn))[0]
  40. def run(all_uows_tar_fn, pattern, nchunks_max):
  41. cmd = 'tar -xvf {}'.format(all_uows_tar_fn)
  42. io.syscall(cmd)
  43. all_uows_dn = dir_from_tar(all_uows_tar_fn)
  44. all_uows = list(sorted(glob.glob('{}/uow-*'.format(all_uows_dn))))
  45. n = min(nchunks_max, len(all_uows))
  46. LOG.info('Num chunks = {} (<= {})'.format(n, nchunks_max))
  47. for i, uows in enumerate(yield_uows(n, all_uows)):
  48. key = '{:02d}'.format(i)
  49. fn = pattern.replace('%', key)
  50. LOG.info('Writing {} units-of-work to "{}" ({}).'.format(len(uows), fn, key))
  51. dn = dir_from_tar(fn)
  52. move_into_tar(dn, uows)
  53. cmd = 'rmdir {}'.format(all_uows_dn)
  54. io.syscall(cmd)
  55. class HelpF(argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter):
  56. pass
  57. def parse_args(argv):
  58. description = 'Scatter a single unit-of-work from many units-of-work.'
  59. epilog = ''
  60. parser = argparse.ArgumentParser(
  61. description=description,
  62. epilog=epilog,
  63. formatter_class=HelpF,
  64. )
  65. parser.add_argument(
  66. '--all-uows-tar-fn',
  67. help='Input. Tarfile of all units of work directories.')
  68. parser.add_argument(
  69. '--nchunks-max', type=int,
  70. help='Input. Maximum number of output files.')
  71. parser.add_argument(
  72. '--pattern',
  73. help='Output. The "%" will be replaced by a zero-padded number. (Probably should be ".tar")')
  74. args = parser.parse_args(argv[1:])
  75. return args
  76. def main(argv=sys.argv):
  77. args = parse_args(argv)
  78. logging.basicConfig(level=logging.INFO)
  79. run(**vars(args))
  80. if __name__ == '__main__': # pragma: no cover
  81. main()