123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512 |
- """Blocking process-watcher.
- See fs_based.py. Here, delete is a no-op, and run() starts threads, so
- the main program needs to wait for threads to finish somehow.
- Typical submission_string:
- qsub -S /bin/bash -sync y -V -q production -N ${JOB_ID} \\\n -o "${STDOUT_FILE}" \\\n -e "${STDERR_FILE}" \\\n -pe smp ${NPROC} -l h_vmem=${MB}M \\\n "${CMD}"
- """
- try:
- from shlex import quote
- except ImportError:
- from pipes import quote
- import collections
- import contextlib
- import copy
- import glob
- import json
- import logging
- import os
- import pprint
- import re
- import signal
- import string
- import subprocess
- import sys
- import threading
- import time
- import traceback
- log = logging.getLogger(__name__)
- LOCAL_SUBMISSION_STRING = '/bin/bash -C ${CMD} >| ${STDOUT_FILE} 2>| ${STDERR_FILE}' # for job_local override
- STATE_FN = 'state.py'
- Job = collections.namedtuple('Job', ['jobid', 'cmd', 'rundir', 'options'])
- MetaJob = collections.namedtuple('MetaJob', ['job', 'lang_exe'])
- lang_python_exe = sys.executable
- lang_bash_exe = '/bin/bash'
- @contextlib.contextmanager
- def cd(newdir):
- prevdir = os.getcwd()
- log.debug('CD: %r <- %r' %(newdir, prevdir))
- os.chdir(os.path.expanduser(newdir))
- try:
- yield
- finally:
- log.debug('CD: %r -> %r' %(newdir, prevdir))
- os.chdir(prevdir)
- class MetaJobClass(object):
- ext = {
- lang_python_exe: '.py',
- lang_bash_exe: '.bash',
- }
- def get_wrapper(self):
- # Totally by convention, for now.
- return '%s/run-%s%s' %(self.mj.job.rundir, self.mj.job.jobid, self.ext[self.mj.lang_exe])
- def get_sentinel(self):
- return 'exit-%s' %self.mj.job.jobid # in watched dir
- def get_pid(self):
- return self.mj.pid
- def kill(self, pid, sig):
- stored_pid = self.get_pid()
- if not pid:
- pid = stored_pid
- log.info('Not passed a pid to kill. Using stored pid:%s' %pid)
- if pid and stored_pid:
- if pid != stored_pid:
- log.error('pid:%s != stored_pid:%s' %(pid, stored_pid))
- os.kill(pid, sig)
- def __init__(self, mj):
- self.mj = mj
- class State(object):
- def notify_threaded(self, jobid):
- self.jobids_threaded.add(jobid)
- def notify_started(self, jobid):
- #state.top['jobids_submitted'].append(jobid)
- self.jobids_submitted.add(jobid)
- self.jobids_threaded.remove(jobid)
- log.debug('Thread notify_started({}).'.format(jobid))
- def notify_exited(self, jobid, rc):
- #self.top['jobid2exit'][jobid] = rc
- self.jobid2exit[jobid] = rc
- self.jobids_submitted.remove(jobid)
- log.debug('Thread notify_exited({}->{}).'.format(jobid, rc))
- def set_job(self, jobid, mjob):
- # Is this needed? For now, we are not actually saving state, so no.
- self.top['jobs'][jobid] = mjob
- def update_jobid2status(self, jobid2status):
- for jobid in self.jobids_threaded:
- status = 'THREADED'
- jobid2status[jobid] = status
- for jobid in self.jobids_submitted:
- status = 'RUNNING'
- # but actually it might not have started yet, or it could be dead, since we have blocking qsub calls
- jobid2status[jobid] = status
- for jobid, rc in self.jobid2exit.items():
- status = 'EXIT {}'.format(rc)
- jobid2status[jobid] = status
- def get_running_jobids(self):
- return list(self.jobids_submitted)
- def serialize(self):
- return pprint.pformat(self.top)
- @staticmethod
- def deserialize(directory, content):
- state = State(directory)
- state.top = eval(content)
- state.content_prev = content
- return state
- @staticmethod
- def create(directory):
- state = State(directory)
- #makedirs(state.get_directory_wrappers())
- #makedirs(state.get_directory_jobs())
- return state
- def __init__(self, directory):
- self.__directory = os.path.abspath(directory)
- self.content_prev = ''
- self.top = dict() # for serialization, when we decide we need it
- self.top['jobs'] = dict()
- #self.top['jobids_submitted'] = list()
- #self.top['jobid2exit'] = dict()
- self.jobids_threaded = set()
- self.jobids_submitted = set()
- self.jobid2exit = dict()
- class SafeState(object):
- """Synchronized State proxy for accessing any
- data which might be modified in a Thread.
- """
- def notify_threaded(self, jobid):
- with self.lock:
- self.state.notify_threaded(jobid)
- def notify_started(self, jobid):
- with self.lock:
- self.state.notify_started(jobid)
- def notify_exited(self, jobid, rc):
- with self.lock:
- self.state.notify_exited(jobid, rc)
- def update_jobid2status(self, table):
- with self.lock:
- return self.state.update_jobid2status(table)
- def get_running_jobids(self):
- with self.lock:
- return self.state.get_running_jobids()
- def serialize(self):
- with self.lock:
- return self.state.serialize()
- def __getattr__(self, name):
- """For all other methods, just delegate.
- """
- return getattr(self.state, name)
- def __init__(self, state):
- self.state = state
- self.lock = threading.Lock()
- def get_state(directory):
- """For now, we never write.
- """
- state_fn = os.path.join(directory, STATE_FN)
- if not os.path.exists(state_fn):
- return State.create(directory)
- assert False, 'No state directory needed, for now.'
- try:
- return State.deserialize(directory, open(state_fn).read())
- except Exception:
- log.exception('Failed to read state "%s". Ignoring (and soon over-writing) current state.'%state_fn)
- # TODO: Backup previous STATE_FN?
- return State(directory)
- def State_save(state):
- # TODO: RW Locks, maybe for runtime of whole program.
- content = state.serialize()
- content_prev = state.content_prev
- if content == content_prev:
- return
- fn = state.get_state_fn()
- open(fn, 'w').write(content)
- log.debug('saved state to %s' %repr(os.path.abspath(fn)))
- def Job_get_MetaJob(job, lang_exe=lang_bash_exe):
- return MetaJob(job, lang_exe=lang_exe)
- def MetaJob_wrap(mjob, state):
- """Write wrapper contents to mjob.wrapper.
- """
- metajob_rundir = mjob.job.rundir
- wdir = metajob_rundir
- bash_template = """#!%(lang_exe)s
- cmd="%(cmd)s"
- rundir="%(rundir)s"
- finish() {
- echo "finish code: $?"
- }
- trap finish 0
- #printenv
- echo
- set -ex
- while [ ! -d "$rundir" ]; do sleep 1; done
- cd "$rundir"
- eval "$cmd"
- """
- mji = MetaJobClass(mjob)
- wrapper_fn = os.path.join(wdir, mji.get_wrapper())
- command = mjob.job.cmd
- wrapped = bash_template %dict(
- lang_exe=mjob.lang_exe,
- cmd=command,
- rundir=metajob_rundir,
- )
- log.debug('Writing wrapper "%s"' %wrapper_fn)
- open(wrapper_fn, 'w').write(wrapped)
- st = os.stat(wrapper_fn)
- os.chmod(wrapper_fn, st.st_mode | 0o111)
- class JobThread(threading.Thread):
- def run(self):
- """Propagate environment, plus env_extra.
- """
- try:
- self.notify_start(self.jobname)
- log.debug('====>hello! started Thread {}'.format(threading.current_thread()))
- myenv = dict(os.environ)
- myenv.update(self.env_extra)
- #log.debug('myenv:\n{}'.format(pprint.pformat(myenv)))
- log.info("====>Popen: '{}'".format(self.cmd))
- if not self.cmd:
- msg = 'Why is self.cmd empty? {} {} {!r}'.format(self, self.jobname, self.cmd)
- raise Exception(msg)
- p = subprocess.Popen(self.cmd, env=myenv, shell=True)
- log.debug("====>pid: {}".format(p.pid))
- p.wait()
- rc = p.returncode
- log.debug("====>rc: {}".format(rc))
- self.notify_exit(self.jobname, rc)
- except:
- log.exception('Failed to submit {}: {!r} Setting rc=42.'.format(self.jobname, self.cmd))
- self.notify_exit(self.jobname, 42)
- def __init__(self, jobname, cmd, notify_start, notify_exit, env_extra):
- super(JobThread, self).__init__()
- self.jobname = jobname
- self.cmd = cmd
- self.notify_start = notify_start
- self.notify_exit = notify_exit
- self.env_extra = env_extra
- class StringJobSubmitter(object):
- """Substitute some variables into self.submission_string.
- Use mains/job_start.sh as the top script. That requires
- PYPEFLOW_JOB_START_SCRIPT in the environment as the real
- script to run. This way, we are guaranteed that the top script exists,
- and we can wait for the rest to appear in the filesystem.
- """
- def submit(self, jobname, mjob, state):
- """Prepare job (based on wrappers) and submit as a new thread.
- """
- state.set_job(jobname, mjob)
- jobname = mjob.job.jobid
- job_dict = mjob.job.options
- #nproc = mjob.job.options['NPROC']
- #mb = mjob.job.options['MB']
- mji = MetaJobClass(mjob)
- #script_fn = os.path.join(state.get_directory_wrappers(), mji.get_wrapper())
- script_fn = mji.get_wrapper()
- exe = mjob.lang_exe
- state.notify_threaded(jobname)
- self.start(jobname, state, exe, script_fn, job_dict) # Can raise
- def get_cmd(self, job_name, script_fn, job_dict):
- """Vars:
- """
- # We wrap in a program that waits for the executable to exist, so
- # the filesystem has time to catch up on the remote machine.
- # Hopefully, this will allow dependencies to become ready as well.
- job_start_fn = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'mains/job_start.sh')
- mapping = dict()
- stdout = script_fn + '.stdout'
- stderr = script_fn + '.stderr'
- run_dir = os.getcwd()
- mapping = dict(
- JOB_EXE='/bin/bash',
- JOB_NAME=job_name, JOB_ID=job_name,
- #JOB_QUEUE=job_queue,
- JOB_SCRIPT=job_start_fn, CMD=job_start_fn,
- JOB_DIR=run_dir, DIR=run_dir,
- JOB_STDOUT=stdout, STDOUT_FILE=stdout,
- JOB_STDERR=stderr, STDERR_FILE=stderr,
- #MB=pypeflow_mb,
- #NPROC=pypeflow_nproc,
- )
- mapping.update(job_dict)
- if 'JOB_OPTS' in mapping:
- # a special two-level mapping: ${JOB_OPTS} is substituted first
- mapping['JOB_OPTS'] = self.sub(mapping['JOB_OPTS'], mapping)
- return self.sub(self.submission_string, mapping)
- @staticmethod
- def sub(template, mapping):
- t = string.Template(template)
- try:
- return t.substitute(mapping)
- except KeyError:
- print(repr(mapping))
- msg = 'Template substitution failed:\n template={!r}\n mapping={}'.format(
- template, pprint.pformat(mapping))
- log.exception(msg)
- raise
- def start(self, jobname, state, exe, script_fn, job_dict):
- """Run job in thread.
- Thread will notify state.
- Can raise.
- """
- #cmd = script_fn
- cmd = self.get_cmd(jobname, script_fn, job_dict)
- # job_start.sh relies on PYPEFLOW_*
- env_extra = {
- }
- log.debug('env_extra={}'.format(pprint.pformat(env_extra)))
- notify_start = state.notify_started
- notify_exit = state.notify_exited
- th = JobThread(jobname, cmd, notify_start, notify_exit, env_extra)
- #th.setDaemon(True)
- th.start()
- def __repr__(self):
- return 'StringJobSubmitter(%s)' %repr(self.submission_string)
- def __init__(self, submission_string):
- self.submission_string = submission_string
- def link_rundir(state_rundir, user_rundir):
- if user_rundir:
- link_fn = os.path.join(user_rundir, 'pwatcher.dir')
- if os.path.lexists(link_fn):
- os.unlink(link_fn)
- os.symlink(os.path.abspath(state_rundir), link_fn)
- def cmd_run(state, jobids, job_type, job_dict):
- """
- Wrap them and run them locally, each in the foreground of a thread.
- """
- jobs = dict()
- submitted = list()
- result = {'submitted': submitted}
- if job_type != 'string':
- log.debug("NOTE: In blocking pwatcher, job_type={!r}, should be 'string'".format(job_type))
- for jobid, desc in jobids.items():
- assert 'cmd' in desc
- cmd = desc['cmd']
- if 'rundir' in desc:
- rundir = desc['rundir']
- else:
- rundir = os.path.dirname(cmd)
- # These are all required now.
- #nproc = desc['job_nproc']
- #mb = desc['job_mb']
- local = int(desc['job_local'])
- options = copy.deepcopy(desc['job_dict']) #dict(NPROC=nproc, MB=mb, local=local)
- options['local'] = local
- jobs[jobid] = Job(jobid, cmd, rundir, options)
- log.debug('jobs:\n%s' %pprint.pformat(jobs))
- submission_string = job_dict['submit']
- basic_submitter = StringJobSubmitter(submission_string)
- local_submitter = StringJobSubmitter(LOCAL_SUBMISSION_STRING)
- log.debug('Basic submitter: {!r}'.format(basic_submitter))
- for jobid, job in jobs.items():
- #desc = jobids[jobid]
- log.debug(' starting job %s' %pprint.pformat(job))
- mjob = Job_get_MetaJob(job)
- MetaJob_wrap(mjob, state)
- try:
- #link_rundir(state.get_directory_job(jobid), desc.get('rundir'))
- if job.options['local']:
- submitter = local_submitter
- else:
- submitter = basic_submitter
- if not submission_string:
- raise Exception('No "submit" key in job_dict:{!r}.'.format(job_dict))
- submitter.submit(jobid, mjob, state)
- submitted.append(jobid)
- except Exception:
- raise
- log.exception('Failed to submit background-job:\n{!r}'.format(
- submitter))
- return result
- # The caller is responsible for deciding what to do about job-submission failures. Re-try, maybe?
- def system(call, checked=False):
- log.info('!{}'.format(call))
- rc = os.system(call)
- if checked and rc:
- raise Exception('{} <- {!r}'.format(rc, call))
- return rc
- _warned = dict()
- def warnonce(hashkey, msg):
- if hashkey in _warned:
- return
- log.warning(msg)
- _warned[hashkey] = True
- def cmd_query(state, which, jobids):
- """Return the state of named jobids.
- If which=='list', then query jobs listed as jobids.
- If which=='known', then query all known jobs.
- If which=='infer', same as 'known' now.
- """
- result = dict()
- jobstats = dict()
- result['jobids'] = jobstats
- if which == 'list':
- for jobid in jobids:
- jobstats[jobid] = 'UNKNOWN'
- state.update_jobid2status(jobstats)
- jobids = set(jobids)
- if which == 'list':
- for jobid in list(jobstats.keys()):
- # TODO: This might remove thousands. We should pass jobids along to update_jobid2status().
- if jobid not in jobids:
- del jobstats[jobid]
- return result
- def cmd_delete(state, which, jobids):
- """Kill designated jobs, including (hopefully) their
- entire process groups.
- If which=='list', then kill all jobs listed as jobids.
- If which=='known', then kill all known jobs.
- If which=='infer', then kill all jobs with heartbeats.
- """
- log.error('Noop. We cannot kill blocked threads. Hopefully, everything will die on SIGTERM.')
- def makedirs(path):
- if not os.path.isdir(path):
- os.makedirs(path)
- def readjson(ifs):
- """Del keys that start with ~.
- That lets us have trailing commas on all other lines.
- """
- content = ifs.read()
- log.debug('content:%s' %repr(content))
- jsonval = json.loads(content)
- #pprint.pprint(jsonval)
- def striptildes(subd):
- if not isinstance(subd, dict):
- return
- for k,v in list(subd.items()):
- if k.startswith('~'):
- del subd[k]
- else:
- striptildes(v)
- striptildes(jsonval)
- #pprint.pprint(jsonval)
- return jsonval
- class ProcessWatcher(object):
- def run(self, jobids, job_type, job_defaults_dict):
- #import traceback; log.debug(''.join(traceback.format_stack()))
- log.debug('run(jobids={}, job_type={}, job_defaults_dict={})'.format(
- '<%s>'%len(jobids), job_type, job_defaults_dict))
- return cmd_run(self.state, jobids, job_type, job_defaults_dict)
- def query(self, which='list', jobids=[]):
- log.debug('query(which={!r}, jobids={})'.format(
- which, '<%s>'%len(jobids)))
- return cmd_query(self.state, which, jobids)
- def delete(self, which='list', jobids=[]):
- log.debug('delete(which={!r}, jobids={})'.format(
- which, '<%s>'%len(jobids)))
- return cmd_delete(self.state, which, jobids)
- def __init__(self, state):
- # state must be thread-safe
- self.state = state
- def get_process_watcher(directory):
- state = get_state(directory)
- state = SafeState(state) # thread-safe proxy
- #log.debug('state =\n%s' %pprint.pformat(state.top))
- return ProcessWatcher(state)
- #State_save(state)
- @contextlib.contextmanager
- def process_watcher(directory):
- """This will (someday) hold a lock, so that
- the State can be written safely at the end.
- """
- state = get_state(directory)
- state = SafeState(state) # thread-safe proxy
- #log.debug('state =\n%s' %pprint.pformat(state.top))
- yield ProcessWatcher(state)
- #State_save(state)
- def main(prog, cmd, state_dir='mainpwatcher', argsfile=None):
- logging.basicConfig()
- logging.getLogger().setLevel(logging.NOTSET)
- log.warning('logging basically configured')
- log.debug('debug mode on')
- assert cmd in ['run', 'query', 'delete']
- ifs = sys.stdin if not argsfile else open(argsfile)
- argsdict = readjson(ifs)
- log.info('argsdict =\n%s' %pprint.pformat(argsdict))
- with process_watcher(state_dir) as watcher:
- result = getattr(watcher, cmd)(**argsdict)
- if result is not None:
- log.info('getattr({!r}, {!r}): {}'.format(
- watcher, cmd, pprint.pformat(result)))
- log.info('Waiting for running jobs...r')
- while watcher.state.get_running_jobids():
- log.info('running: {!s}'.format(watcher.state.get_running_jobids()))
- time.sleep(1)
- if __name__ == "__main__":
- #import pdb
- #pdb.set_trace()
- main(*sys.argv) # pylint: disable=no-value-for-parameter