generic_tar_uows.py 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. import argparse
  2. import collections
  3. import glob
  4. import logging
  5. import os
  6. import sys
  7. import pypeflow.do_task
  8. from .. import io
  9. LOG = logging.getLogger()
  10. def tar_uows(fn, uows):
  11. # Operate in a subdir. (Named, so not thread-safe.)
  12. subdir = os.path.splitext(fn)[0]
  13. io.mkdirs(subdir) # permissions?
  14. with io.cd(subdir):
  15. # We could include other files here, or at least symlinks, but not today.
  16. # Soon, we will construct the uow-subdirs here, but we must consider clobbering.
  17. io.serialize('some-units-of-work.json', uows)
  18. cmd = 'tar -cf {} {}'.format(fn, subdir)
  19. io.syscall(cmd)
  20. io.rmdirs(subdir)
  21. def yield_uows(n, all_uows):
  22. uows_per_chunk = (len(all_uows) + n - 1) / n
  23. for uow in all_uows:
  24. yield [uow]
  25. def run(all_uow_list_fn, pattern, nchunks_max):
  26. all_uows = io.deserialize(all_uow_list_fn)
  27. n = min(nchunks_max, len(all_uows))
  28. LOG.info('Num chunks = {} (<= {})'.format(n, nchunks_max))
  29. for i, uows in enumerate(yield_uows(n, all_uows)):
  30. key = '{:02d}'.format(i)
  31. fn = pattern.replace('%', key)
  32. LOG.info('Writing {} units-of-work to "{}" ({}).'.format(len(uows), fn, key))
  33. tar_uows(fn, uows)
  34. class HelpF(argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter):
  35. pass
  36. def parse_args(argv):
  37. description = 'Split a JSON list of units-of-work into up to N files ("chunks"), still as lists of units-of-work.'
  38. epilog = ''
  39. parser = argparse.ArgumentParser(
  40. description=description,
  41. epilog=epilog,
  42. formatter_class=HelpF,
  43. )
  44. parser.add_argument(
  45. '--all-uow-list-fn',
  46. help='Input. JSON list of units of work.')
  47. parser.add_argument(
  48. '--nchunks_max', type=int,
  49. help='Input. Maximum number of output files.')
  50. parser.add_argument(
  51. '--pattern',
  52. help='Output. The "%" will be replace by a zero-padded number. (These will be a tar-files, so it should probably end in ".tar".')
  53. args = parser.parse_args(argv[1:])
  54. return args
  55. def main(argv=sys.argv):
  56. args = parse_args(argv)
  57. logging.basicConfig(level=logging.INFO)
  58. run(**vars(args))
  59. if __name__ == '__main__': # pragma: no cover
  60. main()