1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374 |
- import argparse
- import collections
- import glob
- import logging
- import os
- import sys
- import pypeflow.do_task
- from .. import io
- LOG = logging.getLogger()
- def tar_uows(fn, uows):
- # Operate in a subdir. (Named, so not thread-safe.)
- subdir = os.path.splitext(fn)[0]
- io.mkdirs(subdir) # permissions?
- with io.cd(subdir):
- # We could include other files here, or at least symlinks, but not today.
- # Soon, we will construct the uow-subdirs here, but we must consider clobbering.
- io.serialize('some-units-of-work.json', uows)
- cmd = 'tar -cf {} {}'.format(fn, subdir)
- io.syscall(cmd)
- io.rmdirs(subdir)
- def yield_uows(n, all_uows):
- uows_per_chunk = (len(all_uows) + n - 1) / n
- for uow in all_uows:
- yield [uow]
- def run(all_uow_list_fn, pattern, nchunks_max):
- all_uows = io.deserialize(all_uow_list_fn)
- n = min(nchunks_max, len(all_uows))
- LOG.info('Num chunks = {} (<= {})'.format(n, nchunks_max))
- for i, uows in enumerate(yield_uows(n, all_uows)):
- key = '{:02d}'.format(i)
- fn = pattern.replace('%', key)
- LOG.info('Writing {} units-of-work to "{}" ({}).'.format(len(uows), fn, key))
- tar_uows(fn, uows)
- class HelpF(argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter):
- pass
- def parse_args(argv):
- description = 'Split a JSON list of units-of-work into up to N files ("chunks"), still as lists of units-of-work.'
- epilog = ''
- parser = argparse.ArgumentParser(
- description=description,
- epilog=epilog,
- formatter_class=HelpF,
- )
- parser.add_argument(
- '--all-uow-list-fn',
- help='Input. JSON list of units of work.')
- parser.add_argument(
- '--nchunks_max', type=int,
- help='Input. Maximum number of output files.')
- parser.add_argument(
- '--pattern',
- help='Output. The "%" will be replace by a zero-padded number. (These will be a tar-files, so it should probably end in ".tar".')
- args = parser.parse_args(argv[1:])
- return args
- def main(argv=sys.argv):
- args = parse_args(argv)
- logging.basicConfig(level=logging.INFO)
- run(**vars(args))
- if __name__ == '__main__': # pragma: no cover
- main()
|