123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764 |
- from .util import (mkdirs, system, touch, run, cd)
- import pwatcher.blocking
- import pwatcher.fs_based
- import pwatcher.network_based
- import networkx
- import networkx.algorithms.dag #import (topological_sort, is_directed_acyclic_graph)
- import hashlib
- import json
- import logging
- import os
- import pprint
- import random
- import re
- import sys
- import tempfile
- import time
- import traceback
- LOG = logging.getLogger(__name__)
- def generate_jobid(node, script_fn):
- # For now, we keep it simple. Just the task.json.
- # We truncate the job_name to 15 chars for the sake of some job systems.
- script_content = open(script_fn, 'rb').read()
- checksum = hashlib.md5(script_content).hexdigest()
- return 'P' + checksum[0:14]
- def generate_jobid_alt_given_checksum(script_fn, checksum):
- """
- Note: Unable to run job: denied: "0_raw_fofn_abs_" is not a valid object name (cannot start with a digit).
- So we need a prefix.
- >>> generate_jobid_alt_given_checksum('4-quiver/000000F_002/run.sh', '123')
- 'P4_000000F_002_123'
- >>> generate_jobid_alt_given_checksum('4-quiver/quiver_scatter/000000F_002/run.sh', '456')
- 'P4_000000F_002_456'
- """
- # TODO: Consider intermediate directories.
- stage = get_stage_char(script_fn)
- basedirname = os.path.basename(os.path.dirname(script_fn))
- return alphanum('P' + stage + '_' + basedirname + '_' + checksum) # without length limit
- def generate_jobid_alt(node, script_fn):
- # dgordon suggests this as a preferable alternative.
- script_content = open(script_fn, 'rb').read()
- checksum = hashlib.md5(script_content).hexdigest()
- return generate_jobid_alt_given_checksum(script_fn, checksum)
- JOBID_GENERATORS = [generate_jobid, generate_jobid_alt]
- def alphanum(foo):
- """
- >>> alphanum('/foo-bar/')
- '_foo_bar_'
- """
- return foo.replace('/', '_').replace('-', '_')
- re_stage_char = re.compile(r'\b(\d)-\w+/')
- def get_stage_char(fn):
- """
- >>> get_stage_char('0-hi/x/y')
- '0'
- >>> get_stage_char('x/y')
- 'P'
- """
- mo = re_stage_char.search(fn)
- if mo:
- return mo.group(1)
- return 'P'
- def endrun(thisnode, status):
- """By convention for now, status is one of:
- 'DEAD'
- 'UNSUBMITTED' (a pseudo-status defined in the ready-loop of alive())
- 'EXIT rc'
- """
- name = status.split()[0]
- if name == 'DEAD':
- LOG.warning(''.join(traceback.format_stack()))
- LOG.error('Task {}\n is DEAD, meaning no HEARTBEAT, but this can be a race-condition. If it was not killed, then restarting might suffice. Otherwise, you might have excessive clock-skew.'.format(thisnode))
- #thisnode.setSatisfied(False) # I think we can leave it unsatisfied. Not sure what happens on retry though.
- elif name == 'UNSUBMITTED':
- LOG.warning(''.join(traceback.format_stack()))
- LOG.error('Task {}\n is UNSUBMITTED, meaning job-submission somehow failed. Possibly a re-start would work. Otherwise, you need to investigate.'.format(thisnode))
- thisnode.setSatisfied(False)
- elif name != 'EXIT':
- raise Exception('Unexpected status {!r}'.format(name))
- else:
- code = int(status.split()[1])
- if 0 != code:
- LOG.error('Task {} failed with exit-code={}'.format(thisnode, code))
- thisnode.setSatisfied(False)
- else:
- thisnode.setSatisfied(True)
- class PwatcherTaskQueue(object):
- def enque(self, nodes):
- """This should be an injected dependency.
- Yield any Nodes that could not be submitted.
- Nodes that lacked a script were actually run in-process and are considered
- successful unless they raised an Exception, so they go into the to_report set.
- """
- # Start all "nodes".
- # Note: It is safe to run this block always, but we save a
- # call to pwatcher with 'if ready'.
- LOG.debug('enque nodes:\n%s' %pprint.pformat(nodes))
- jobids = dict()
- for node in nodes:
- #node.satisfy() # This would do the job without a process-watcher.
- generated_script_fn = node.execute() # misnomer; this only dumps task.json now
- if not generated_script_fn:
- raise Exception('Missing generated_script_fn for Node {}'.format(node))
- ## This task is done.
- #endrun(node, 'EXIT 0')
- #self.__to_report.append(node)
- #continue
- # For now, consider it as "submitted" and finished.
- # (It would throw exception on error.)
- jobid = self.__generate_jobid(node, generated_script_fn)
- self.__known[jobid] = node
- rundir, basename = os.path.split(os.path.abspath(generated_script_fn))
- cmd = '/bin/bash {}'.format(basename)
- job_type = node.pypetask.parameters.get('job_type', None)
- dist = node.pypetask.dist # must always exist now
- job_dict = dict(self.__job_defaults_dict)
- job_dict.update(dist.job_dict)
- LOG.debug('In rundir={!r}, job_dict={!r}'.format(
- rundir, job_dict))
- #job_nproc = dist.NPROC # string or int
- #job_mb = dist.MB # string or int, megabytes
- job_local = int(dist.local) # bool->1/0, easily serialized
- jobids[jobid] = {
- 'cmd': cmd,
- 'rundir': rundir,
- # These are optional:
- 'job_type': job_type,
- 'job_dict': job_dict,
- #'job_nproc': job_nproc,
- #'job_mb': job_mb,
- 'job_local': job_local,
- }
- # Also send the default type and queue-name.
- watcher_args = {
- 'jobids': jobids,
- 'job_type': self.__job_type,
- 'job_defaults_dict': self.__job_defaults_dict,
- }
- result = self.watcher.run(**watcher_args)
- LOG.debug('Result of watcher.run()={}'.format(repr(result)))
- submitted = result['submitted']
- self.__running.update(submitted)
- for jobid in (set(jobids.keys()) - set(submitted)):
- yield self.__known[jobid] # TODO: What should be the status for these submission failures?
- def check_done(self):
- """Yield Nodes which have finished since the last check.
- """
- watcher_args = {
- 'jobids': list(self.__running),
- 'which': 'list',
- }
- q = self.watcher.query(**watcher_args)
- #LOG.debug('In check_done(), result of query:%s' %repr(q))
- for jobid, status in q['jobids'].items():
- if status.startswith('EXIT') or status.startswith('DEAD'):
- self.__running.remove(jobid)
- node = self.__known[jobid]
- self.__to_report.append(node)
- try:
- endrun(node, status)
- except Exception as e:
- msg = 'Failed to clean-up FakeThread: jobid={} status={}'.format(jobid, repr(status))
- LOG.exception(msg)
- raise
- to_report = list(self.__to_report)
- self.__to_report = list()
- return to_report
- def terminate(self):
- watcher_args = {
- 'jobids': list(self.__running),
- 'which': 'known',
- }
- q = self.watcher.delete(**watcher_args)
- LOG.debug('In notifyTerminate(), result of delete:%s' %repr(q))
- def __init__(self, watcher, section_job=None, job_type='local', job_defaults_dict=None, jobid_generator=0):
- self.watcher = watcher
- self.__section_job = section_job if section_job else {}
- self.__job_type = job_type
- self.__job_defaults_dict = job_defaults_dict
- self.__running = set() # jobids
- self.__known = dict() # jobid -> Node
- self.__to_report = list() # Nodes
- self.__generate_jobid = JOBID_GENERATORS[jobid_generator]
- def get_unsatisfied_subgraph(g):
- unsatg = networkx.DiGraph(g)
- for n in g.nodes():
- if n.satisfied():
- unsatg.remove_node(n)
- return unsatg
- def find_all_ancestors(g):
- ancestors = dict()
- def get_ancestors(node):
- """memoized"""
- if node not in ancestors:
- myancestors = set()
- for pred in g.predecessors_iter(node):
- myancestors.update(get_ancestors(pred))
- ancestors[node] = myancestors
- return ancestors[node]
- for node in g.nodes():
- get_ancestors(node)
- return ancestors
- def find_next_ready_and_remove(g, node):
- """Given a recently satisfied node,
- return any successors with in_degree 1.
- Then remove node from g immediately.
- """
- ready = set()
- for n in g.successors(node):
- if 1 == g.in_degree(n):
- ready.add(n)
- g.remove_node(node)
- return ready
- def find_all_roots(g):
- """Find all nodes in g which have no predecessors.
- """
- roots = set()
- for node in g:
- if 0 == g.in_degree(node):
- roots.add(node)
- return roots
- class Workflow(object):
- def _create_node(self, pypetask):
- """Given a PypeTask, return a Node for our Workflow graph.
- Recursively create more nodes based on 'pypetask.inputs',
- record them as 'node.needs', and update the global pypetask2node table.
- """
- needs = set()
- use_tmpdir = self.use_tmpdir
- pre_script = self.pre_script
- if pypetask.dist.use_tmpdir is not None:
- use_tmpdir = pypetask.dist.use_tmpdir
- node = PypeNode(pypetask.name, pypetask.wdir, pypetask, needs, use_tmpdir, pre_script)
- self.pypetask2node[pypetask] = node
- for key, plf in pypetask.inputs.items():
- if plf.producer is None:
- continue
- if plf.producer not in self.pypetask2node:
- self._create_node(plf.producer)
- needed_node = self.pypetask2node[plf.producer]
- needs.add(needed_node)
- LOG.debug('New {!r} needs {!r}'.format(node, needs))
- return node
- def addTask(self, pypetask):
- node = self._create_node(pypetask)
- sentinel_done_fn = node.sentinel_done_fn()
- if sentinel_done_fn in self.sentinels:
- msg = 'FOUND sentinel {!r} twice: {!r} ({!r}) and {!r}\nNote: Each task needs its own sentinel (and preferably its own run-dir).'.format(sentinel_done_fn, node, pypetask, self.sentinels[sentinel_done_fn])
- raise Exception(msg)
- self.sentinels[sentinel_done_fn] = node
- self.graph.add_node(node)
- for need in node.needs:
- #print "Need:", need, node
- self.graph.add_edge(need, node)
- def addTasks(self, tlist):
- for t in tlist:
- self.addTask(t)
- def refreshTargets(self, targets=None, updateFreq=10, exitOnFailure=True):
- try:
- self._refreshTargets(updateFreq, exitOnFailure)
- except:
- self.tq.terminate()
- raise
- def _refreshTargets(self, updateFreq, exitOnFailure):
- """Raise Exception (eventually) on any failure.
- - updateFreq (seconds) is really a max; we climb toward it gradually, and we reset when things change.
- - exitOnFailure=False would allow us to keep running (for a while) when parallel jobs fail.
- - self.max_jobs: the max number of concurrently running jobs
- - possibly this should be the number of cpus in use, but for now it is qsub jobs, for simplicity.
- """
- # Note: With the 'blocking' pwatcher, we will have 1 thread per live qsub job.
- # If/when that becomes a problem, please re-write the pwatcher as Go or Nim.
- # This loop is single-threaded. If we ignore max_jobs,
- # then we would send large queries for no reason, but that is not really a big deal.
- # The Python 'blocking' pwatcher is the real reason to limit jobs, for now.
- assert networkx.algorithms.dag.is_directed_acyclic_graph(self.graph)
- assert isinstance(self.max_jobs, int)
- failures = 0
- unsatg = get_unsatisfied_subgraph(self.graph)
- ready = find_all_roots(unsatg)
- submitted = set()
- init_sleep_time = 0.1
- sleep_time = init_sleep_time
- slept_seconds_since_last_ping = 0.0
- num_iterations_since_last_ping = 0
- num_iterations_since_last_ping_max = 1
- LOG.info('Num unsatisfied: {}, graph: {}'.format(len(unsatg), len(self.graph)))
- while ready or submitted:
- # Nodes cannot be in ready or submitted unless they are also in unsatg.
- to_submit = set()
- if self.max_jobs <= 0:
- msg = 'self.max_jobs={}'.format(self.max_jobs)
- raise Exception(msg)
- while ready and (self.max_jobs > len(submitted) + len(to_submit)):
- node = ready.pop()
- to_submit.add(node)
- LOG.info('About to submit: {!r}'.format(node))
- if to_submit:
- unsubmitted = set(self.tq.enque(to_submit)) # In theory, this traps exceptions.
- if unsubmitted:
- msg = 'Failed to enqueue {} of {} jobs: {!r}'.format(
- len(unsubmitted), len(to_submit), unsubmitted)
- LOG.error(msg)
- #ready.update(unsubmitted) # Resubmit only in pwatcher, if at all.
- raise Exception(msg)
- submitted.update(to_submit - unsubmitted)
- LOG.debug('N in queue: {} (max_jobs={})'.format(len(submitted), self.max_jobs))
- recently_done = set(self.tq.check_done())
- num_iterations_since_last_ping += 1
- if not recently_done:
- if not submitted:
- LOG.warning('Nothing is happening, and we had {} failures. Should we quit? Instead, we will just sleep.'.format(failures))
- #break
- if num_iterations_since_last_ping_max <= num_iterations_since_last_ping:
- # Ping!
- LOG.info('(slept for another {}s -- another {} loop iterations)'.format(
- slept_seconds_since_last_ping, num_iterations_since_last_ping))
- slept_seconds_since_last_ping = 0.0
- num_iterations_since_last_ping = 0
- num_iterations_since_last_ping_max += 1
- slept_seconds_since_last_ping += sleep_time
- time.sleep(sleep_time)
- sleep_time = sleep_time + 0.1 if (sleep_time < updateFreq) else updateFreq
- continue
- LOG.debug('recently_done: {!r}'.format([(rd, rd.satisfied()) for rd in recently_done]))
- LOG.debug('Num done in this iteration: {}'.format(len(recently_done)))
- sleep_time = init_sleep_time
- submitted -= recently_done
- recently_satisfied = set(n for n in recently_done if n.satisfied())
- recently_done -= recently_satisfied
- #LOG.debug('Now N recently_done: {}'.format(len(recently_done)))
- for node in recently_satisfied:
- ready.update(find_next_ready_and_remove(unsatg, node))
- if recently_satisfied:
- LOG.info('recently_satisfied:\n{}'.format(pprint.pformat(recently_satisfied)))
- LOG.info('Num satisfied in this iteration: {}'.format(len(recently_satisfied)))
- LOG.info('Num still unsatisfied: {}'.format(len(unsatg)))
- if recently_done:
- msg = 'Some tasks are recently_done but not satisfied: {!r}'.format(recently_done)
- LOG.error(msg)
- LOG.error('ready: {!r}\n\tsubmitted: {!r}'.format(ready, submitted))
- failures += len(recently_done)
- if exitOnFailure:
- raise Exception(msg)
- assert not submitted
- assert not ready
- if failures:
- raise Exception('We had {} failures. {} tasks remain unsatisfied.'.format(
- failures, len(unsatg)))
- @property
- def max_jobs(self):
- return self.__max_njobs
- @max_jobs.setter
- def max_jobs(self, val):
- pre = self.__max_njobs
- if pre != int(val):
- self.__max_njobs = int(val)
- LOG.info('Setting max_jobs to {}; was {}'.format(self.__max_njobs, pre))
- def __init__(self, watcher, job_type, job_defaults_dict, max_jobs, use_tmpdir, squash, jobid_generator,
- pre_script=None,
- ):
- self.graph = networkx.DiGraph()
- # TODO: Inject PwatcherTaskQueue
- self.tq = PwatcherTaskQueue(watcher=watcher, job_type=job_type, job_defaults_dict=job_defaults_dict,
- jobid_generator=jobid_generator,
- )
- assert isinstance(max_jobs, int)
- assert max_jobs > 0, 'max_jobs needs to be set. If you use the "blocking" process-watcher, it is also the number of threads.'
- self.__max_njobs = None
- self.max_jobs = max_jobs
- self.sentinels = dict() # sentinel_done_fn -> Node
- self.pypetask2node = dict()
- self.use_tmpdir = use_tmpdir
- if not pre_script:
- pre_script = os.environ.get('PYPEFLOW_PRE', None)
- if pre_script:
- LOG.warning('Found PYPEFLOW_PRE in env; using that for pre_script.')
- self.pre_script = pre_script # command(s) to run at top of bash scripts
- if self.pre_script:
- LOG.info('At top of bash scripts, we will run:\n{}'.format(self.pre_script))
- self.squash = squash # This really should depend on each Task, but for now a global is fine.
- # For small genomes, serial tasks should always be squashed.
- class NodeBase(object):
- """Graph node.
- Can be satisfied on demand, but usually we call execute() and later run the script.
- """
- def setSatisfied(self, s):
- self.__satisfied = s
- def workdir(self):
- return self.wdir
- def script_fn(self):
- return os.path.join(self.workdir(), 'run.sh')
- def sentinel_done_fn(self):
- return self.script_fn() + '.done'
- def satisfied(self):
- """Not just done, but successful.
- If we see the sentinel file, then we memoize True.
- But if we finished a distributed job with exit-code 0, we do not need
- to wait for the sentinel; we know we had success.
- """
- #LOG.debug('Checking satisfied({!r}) for sentinel {!r}'.format(self, self.sentinel_done_fn()))
- if self.__satisfied is not None:
- return self.__satisfied
- if os.path.exists(self.sentinel_done_fn()):
- self.setSatisfied(True)
- return self.__satisfied == True
- def satisfy(self):
- if self.__satisfied:
- return
- # Technically, we might need to cd into wdir first.
- script_fn = self.execute()
- if script_fn:
- run(script_fn)
- self.__satisfied = True
- def execute(self):
- try:
- actual_script_fn = self.generate_script()
- except Exception:
- LOG.exception('Failed generate_script() for {!r}'.format(self))
- raise
- sentinel_done_fn = self.sentinel_done_fn()
- wdir = self.workdir()
- rel_actual_script_fn = os.path.relpath(actual_script_fn, wdir)
- wrapper = """#!/bin/sh
- set -vex
- export PATH=$PATH:/bin
- cd {wdir}
- /bin/bash {rel_actual_script_fn}
- touch {sentinel_done_fn}
- """.format(**locals())
- wrapper_fn = self.script_fn()
- #mkdirs(os.path.dirname(wrapper_fn))
- with open(wrapper_fn, 'w') as ofs:
- ofs.write(wrapper)
- return wrapper_fn
- def generate_script(self):
- raise NotImplementedError(repr(self))
- def __repr__(self):
- return 'Node({})'.format(self.name)
- def __init__(self, name, wdir, needs):
- self.__satisfied = None # satisfiable
- self.name = name
- self.wdir = wdir
- self.needs = needs
- class PypeNode(NodeBase):
- """
- We will clean this up later. For now, it is pretty tightly coupled to PypeTask.
- """
- def generate_script(self):
- wdir = os.path.normpath(self.wdir)
- mkdirs(wdir)
- pt = self.pypetask
- assert pt.wdir == self.wdir
- inputs = {k:os.path.relpath(v.path, wdir) for k,v in pt.inputs.items()}
- outputs = {k:os.path.relpath(v.path, wdir) for k,v in pt.outputs.items()}
- for v in outputs.values():
- assert not os.path.isabs(v), '{!r} is not relative'.format(v)
- bash_template = pt.bash_template
- LOG.debug("====>inputs={},\n====>outputs={},\n====>bash_template={}"
- .format(inputs, outputs, bash_template))
- if bash_template is not None:
- # Write bash_template.
- bash_script_fn = os.path.join(wdir, 'template.sh')
- with open(bash_script_fn, 'w') as ofs:
- message = '# Substitution will be similar to snakemake "shell".\n'
- if self.pre_script:
- message += self.pre_script + '\n'
- ofs.write(message + bash_template)
- task_desc = {
- 'inputs': inputs,
- 'outputs': outputs,
- 'parameters': pt.parameters,
- 'bash_template_fn' : 'template.sh',
- }
- else:
- raise Exception('We no longer support python functions as PypeTasks.')
- task_content = json.dumps(task_desc, sort_keys=True, indent=4, separators=(',', ': ')) + '\n'
- task_json_fn = os.path.join(wdir, 'task.json')
- open(task_json_fn, 'w').write(task_content)
- python = 'python3' # sys.executable fails sometimes because of binwrapper: SE-152
- tmpdir_flag = '--tmpdir {}'.format(self.use_tmpdir) if self.use_tmpdir else ''
- cmd = '{} -m pypeflow.do_task {} {}'.format(python, tmpdir_flag, task_json_fn)
- env_setup = 'env | sort'
- if self.pre_script:
- env_setup = self.pre_script + '\n' + env_setup
- script_content = """#!/bin/bash
- onerror () {{
- set -vx
- echo "FAILURE. Running top in $(pwd) (If you see -terminal database is inaccessible- you are using the python bin-wrapper, so you will not get diagnostic info. No big deal. This process is crashing anyway.)"
- rm -f top.txt
- which python
- which top
- env -u LD_LIBRARY_PATH top -b -n 1 >| top.txt &
- env -u LD_LIBRARY_PATH top -b -n 1 2>&1
- pstree -apl
- }}
- trap onerror ERR
- {env_setup}
- echo "HOSTNAME=$(hostname)"
- echo "PWD=$(pwd)"
- time {cmd}
- """.format(**locals())
- script_fn = os.path.join(wdir, 'task.sh')
- open(script_fn, 'w').write(script_content)
- return script_fn
- def old_generate_script(self):
- pt = self.pypetask
- func = pt.func
- func(pt) # Run the function! Probably just generate a script.
- generated_script_fn = getattr(pt, 'generated_script_fn', None) # by convention
- try:
- # Maybe we should require a script always.
- generated_script_fn = pt.generated_script_fn
- except Exception:
- LOG.exception('name={!r} URL={!r}'.format(pt.name, pt.URL))
- raise
- return generated_script_fn
- def __init__(self, name, wdir, pypetask, needs, use_tmpdir, pre_script):
- super(PypeNode, self).__init__(name, wdir, needs)
- self.pypetask = pypetask
- self.use_tmpdir = use_tmpdir
- self.pre_script = pre_script
- # This global exists only because we continue to support the old PypeTask style,
- # where a PypeLocalFile does not know the PypeTask which produces it.
- # (This also allows us to specify PypeTasks out of order, fwiw.)
- # Someday, we might require PypeTasks to depend on outputs of other PypeTasks explicitly;
- # then we can drop this dict.
- PRODUCERS = dict()
- def findPypeLocalFile(path):
- """Look-up based on tail dirname.
- Do not call this for paths relative to their work-dirs.
- """
- assert os.path.isabs(path)
- basename = os.path.basename(path)
- basedir = os.path.relpath(os.path.dirname(path))
- producer = PRODUCERS.get(basedir)
- if producer is None:
- msg = 'No producer PypeTask for basedir {!r} from path {!r} -- Pure input?'.format(
- basedir, path)
- LOG.debug(msg)
- return PypeLocalFile(path, None)
- siblings = producer.outputs
- for plf in siblings.values():
- sibling_basename = os.path.basename(plf.path)
- if sibling_basename == basename:
- return plf
- raise Exception('Failed to find a PypeLocalFile for {!r} among outputs of {!r}\n\t(Note that pure inputs must not be in task directories.)'.format(
- path, producer))
- def find_work_dir(paths):
- """Return absolute path to directory of all these paths.
- Must be same for all.
- """
- dirnames = set(os.path.dirname(os.path.normpath(p)) for p in paths)
- if len(dirnames) != 1:
- raise Exception('Cannot find work-dir for paths in multiple (or zero) dirs: {} in {}'.format(
- pprint.pformat(paths), pprint.pformat(dirnames)))
- d = dirnames.pop()
- return os.path.abspath(d)
- class PypeLocalFile(object):
- def __repr__(self):
- if self.producer:
- path = os.path.relpath(self.path, self.producer.wdir)
- return 'PLF({!r}, {!r})'.format(path, os.path.relpath(self.producer.wdir))
- else:
- path = os.path.relpath(self.path)
- wdir = None
- return 'PLF({!r}, {!r})'.format(path, None)
- def __init__(self, path, producer):
- self.path = os.path.abspath(path)
- self.producer = producer
- def makePypeLocalFile(p, producer=None):
- return PypeLocalFile(p, producer)
- def fn(p):
- """This must be run in the top run-dir.
- All task funcs are executed there.
- """
- if isinstance(p, PypeLocalFile):
- p = p.path
- return os.path.abspath(p)
- def only_path(p):
- if isinstance(p, PypeLocalFile):
- return p.path
- else:
- return p
- class Dist(object):
- @property
- def pypeflow_nproc(self):
- return self.job_dict['NPROC']
- @property
- def pypeflow_mb(self):
- """per processor"""
- return self.job_dict['MB']
- def __init__(self, NPROC=1, MB=4000, local=False, job_dict={}, use_tmpdir=None):
- my_job_dict = {}
- if local:
- # Keep it simple. If we run local-only, then do not even bother with tmpdir.
- # This helps with simpler scatter/gather tasks, which copy paths.
- use_tmpdir = False
- my_job_dict['NPROC'] = NPROC
- my_job_dict['MB'] = MB
- my_job_dict.update(job_dict) # user overrides everything
- self.job_dict = my_job_dict
- self.local = local
- self.use_tmpdir = use_tmpdir
- def PypeTask(inputs, outputs, parameters=None, wdir=None, bash_template=None, dist=None):
- """A slightly messy factory because we want to support both strings and PypeLocalFiles, for now.
- This can alter dict values in inputs/outputs if they were not already PypeLocalFiles.
- """
- if dist is None:
- dist = Dist()
- LOG.debug('New PypeTask(wdir={!r},\n\tinputs={!r},\n\toutputs={!r})'.format(
- wdir, inputs, outputs))
- if wdir is None:
- #wdir = parameters.get('wdir', name) # One of these must be a string!
- wdir = find_work_dir([only_path(v) for v in outputs.values()])
- # Since we derived wdir from outputs, any relative paths should become absolute.
- for k,v in list(outputs.items()):
- if not isinstance(v, PypeLocalFile) and not os.path.isabs(v):
- outputs[k] = os.path.abspath(v)
- for k,v in list(inputs.items()):
- if not isinstance(v, PypeLocalFile) and not os.path.isabs(v):
- inputs[k] = os.path.abspath(v)
- else:
- # relpaths are relative to the provided wdir,
- pass
- if not os.path.isabs(wdir):
- wdir = os.path.abspath(wdir)
- this = _PypeTask(inputs, outputs, wdir, parameters, bash_template, dist)
- #basedir = os.path.basename(wdir)
- basedir = this.name
- if basedir in PRODUCERS:
- raise Exception('Basedir {!r} already used for {!r}. Cannot create new PypeTask {!r}.'.format(
- basedir, PRODUCERS[basedir], this))
- LOG.debug('Added PRODUCERS[{!r}] = {!r}'.format(basedir, this))
- PRODUCERS[basedir] = this
- this.canonicalize() # Already abs, but make everything a PLF.
- this.assert_canonical()
- LOG.debug('Built {!r}'.format(this))
- return this
- class _PypeTask(object):
- """Adaptor from old PypeTask API.
- """
- def canonicalize(self):
- for key, val in list(self.outputs.items()):
- if not isinstance(val, PypeLocalFile):
- # If relative, then it is relative to our wdir.
- #LOG.warning('Making PLF: {!r} {!r}'.format(val, self))
- if not os.path.isabs(val):
- val = os.path.join(self.wdir, val)
- #LOG.warning('Muking PLF: {!r} {!r}'.format(val, self))
- val = PypeLocalFile(val, self)
- self.outputs[key] = val
- else:
- val.producer = self
- for key, val in list(self.inputs.items()):
- if not isinstance(val, PypeLocalFile):
- assert os.path.isabs(val), 'Inputs cannot be relative at self point: {!r} in {!r}'.format(val, self)
- self.inputs[key] = findPypeLocalFile(val)
- def assert_canonical(self):
- # Output values will be updated after PypeTask init, so refer back to self as producer.
- for k,v in self.inputs.items():
- assert os.path.isabs(v.path), 'For {!r}, input {!r} is not absolute'.format(self.wdir, v)
- for k,v in self.outputs.items():
- assert os.path.isabs(v.path), 'For {!r}, output {!r} is not absolute'.format(self.wdir, v)
- common = set(self.inputs.keys()) & set(self.outputs.keys())
- assert (not common), 'A key is used for both inputs and outputs of PypeTask({}), which could be ok, but only if we refer to them as input.foo and output.foo in the bash script: {!r}'.format(self.wdir, common)
- def __call__(self, func):
- self.func = func
- self.__name__ = '{}.{}'.format(func.__module__, func.__name__)
- return self
- def __repr__(self):
- return 'PypeTask({!r}, {!r}, {!r}, {!r})'.format(self.name, self.wdir, pprint.pformat(self.outputs), pprint.pformat(self.inputs))
- def __init__(self, inputs, outputs, wdir, parameters, bash_template, dist):
- if parameters is None:
- parameters = {}
- name = os.path.relpath(wdir)
- URL = 'task://localhost/{}'.format(name)
- self.inputs = inputs
- self.outputs = outputs
- self.parameters = dict(parameters) # Always copy this, so caller can re-use, for convenience.
- self.bash_template = bash_template
- self.wdir = wdir
- self.name = name
- self.URL = URL
- self.dist = dist
- #for key, bn in inputs.iteritems():
- # setattr(self, key, os.path.abspath(bn))
- #for key, bn in outputs.iteritems():
- # setattr(self, key, os.path.abspath(bn))
- assert self.wdir, 'No wdir for {!r} {!r}'.format(self.name, self.URL)
- LOG.debug('Created {!r}'.format(self))
- MyFakePypeThreadTaskBase = None # just a symbol, not really used
- # Here is the main factory.
- def PypeProcWatcherWorkflow(
- URL = None,
- job_defaults=dict(),
- squash = False,
- **attributes):
- """Factory for the workflow.
- """
- job_type = job_defaults.get('job_type', 'local')
- job_name_style = job_defaults.get('job_name_style', '')
- watcher_type = job_defaults.get('pwatcher_type', 'fs_based')
- watcher_directory = job_defaults.get('pwatcher_directory', 'mypwatcher')
- use_tmpdir = job_defaults.get('use_tmpdir', None)
- max_jobs = int(job_defaults.get('njobs', 24)) # must be > 0, but not too high
- if watcher_type == 'blocking':
- pwatcher_impl = pwatcher.blocking
- elif watcher_type == 'network_based':
- pwatcher_impl = pwatcher.network_based
- else:
- pwatcher_impl = pwatcher.fs_based
- LOG.info('In simple_pwatcher_bridge, pwatcher_impl={!r}'.format(pwatcher_impl))
- watcher = pwatcher_impl.get_process_watcher(watcher_directory)
- if use_tmpdir:
- try:
- if not os.path.isabs(use_tmpdir):
- use_tmpdir = os.path.abspath(use_tmpdir)
- except (TypeError, AttributeError):
- use_tmpdir = tempfile.gettempdir()
- if not job_name_style:
- job_name_style = 0
- LOG.info('job_type={!r}, (default)job_defaults={!r}, use_tmpdir={!r}, squash={!r}, job_name_style={!r}'.format(
- job_type, job_defaults, use_tmpdir, squash, job_name_style,
- ))
- jobid_generator = int(job_name_style)
- return Workflow(watcher,
- job_type=job_type, job_defaults_dict=job_defaults, max_jobs=max_jobs, use_tmpdir=use_tmpdir,
- squash=squash, jobid_generator=jobid_generator,
- )
- #th = MyPypeFakeThreadsHandler('mypwatcher', job_type, job_queue)
- #mq = MyMessageQueue()
- #se = MyFakeShutdownEvent() # TODO: Save pwatcher state on ShutdownEvent. (Not needed for blocking pwatcher. Mildly useful for fs_based.)
- #return pypeflow.controller._PypeConcurrentWorkflow(URL=URL, thread_handler=th, messageQueue=mq, shutdown_event=se,
- # attributes=attributes)
- __all__ = ['PypeProcWatcherWorkflow', 'fn', 'makePypeLocalFile', 'MyFakePypeThreadTaskBase', 'PypeTask']
|