blocking.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512
  1. """Blocking process-watcher.
  2. See fs_based.py. Here, delete is a no-op, and run() starts threads, so
  3. the main program needs to wait for threads to finish somehow.
  4. Typical submission_string:
  5. qsub -S /bin/bash -sync y -V -q production -N ${JOB_ID} \\\n -o "${STDOUT_FILE}" \\\n -e "${STDERR_FILE}" \\\n -pe smp ${NPROC} -l h_vmem=${MB}M \\\n "${CMD}"
  6. """
  7. try:
  8. from shlex import quote
  9. except ImportError:
  10. from pipes import quote
  11. import collections
  12. import contextlib
  13. import copy
  14. import glob
  15. import json
  16. import logging
  17. import os
  18. import pprint
  19. import re
  20. import signal
  21. import string
  22. import subprocess
  23. import sys
  24. import threading
  25. import time
  26. import traceback
  27. log = logging.getLogger(__name__)
  28. LOCAL_SUBMISSION_STRING = '/bin/bash -C ${CMD} >| ${STDOUT_FILE} 2>| ${STDERR_FILE}' # for job_local override
  29. STATE_FN = 'state.py'
  30. Job = collections.namedtuple('Job', ['jobid', 'cmd', 'rundir', 'options'])
  31. MetaJob = collections.namedtuple('MetaJob', ['job', 'lang_exe'])
  32. lang_python_exe = sys.executable
  33. lang_bash_exe = '/bin/bash'
  34. @contextlib.contextmanager
  35. def cd(newdir):
  36. prevdir = os.getcwd()
  37. log.debug('CD: %r <- %r' %(newdir, prevdir))
  38. os.chdir(os.path.expanduser(newdir))
  39. try:
  40. yield
  41. finally:
  42. log.debug('CD: %r -> %r' %(newdir, prevdir))
  43. os.chdir(prevdir)
  44. class MetaJobClass(object):
  45. ext = {
  46. lang_python_exe: '.py',
  47. lang_bash_exe: '.bash',
  48. }
  49. def get_wrapper(self):
  50. # Totally by convention, for now.
  51. return '%s/run-%s%s' %(self.mj.job.rundir, self.mj.job.jobid, self.ext[self.mj.lang_exe])
  52. def get_sentinel(self):
  53. return 'exit-%s' %self.mj.job.jobid # in watched dir
  54. def get_pid(self):
  55. return self.mj.pid
  56. def kill(self, pid, sig):
  57. stored_pid = self.get_pid()
  58. if not pid:
  59. pid = stored_pid
  60. log.info('Not passed a pid to kill. Using stored pid:%s' %pid)
  61. if pid and stored_pid:
  62. if pid != stored_pid:
  63. log.error('pid:%s != stored_pid:%s' %(pid, stored_pid))
  64. os.kill(pid, sig)
  65. def __init__(self, mj):
  66. self.mj = mj
  67. class State(object):
  68. def notify_threaded(self, jobid):
  69. self.jobids_threaded.add(jobid)
  70. def notify_started(self, jobid):
  71. #state.top['jobids_submitted'].append(jobid)
  72. self.jobids_submitted.add(jobid)
  73. self.jobids_threaded.remove(jobid)
  74. log.debug('Thread notify_started({}).'.format(jobid))
  75. def notify_exited(self, jobid, rc):
  76. #self.top['jobid2exit'][jobid] = rc
  77. self.jobid2exit[jobid] = rc
  78. self.jobids_submitted.remove(jobid)
  79. log.debug('Thread notify_exited({}->{}).'.format(jobid, rc))
  80. def set_job(self, jobid, mjob):
  81. # Is this needed? For now, we are not actually saving state, so no.
  82. self.top['jobs'][jobid] = mjob
  83. def update_jobid2status(self, jobid2status):
  84. for jobid in self.jobids_threaded:
  85. status = 'THREADED'
  86. jobid2status[jobid] = status
  87. for jobid in self.jobids_submitted:
  88. status = 'RUNNING'
  89. # but actually it might not have started yet, or it could be dead, since we have blocking qsub calls
  90. jobid2status[jobid] = status
  91. for jobid, rc in self.jobid2exit.items():
  92. status = 'EXIT {}'.format(rc)
  93. jobid2status[jobid] = status
  94. def get_running_jobids(self):
  95. return list(self.jobids_submitted)
  96. def serialize(self):
  97. return pprint.pformat(self.top)
  98. @staticmethod
  99. def deserialize(directory, content):
  100. state = State(directory)
  101. state.top = eval(content)
  102. state.content_prev = content
  103. return state
  104. @staticmethod
  105. def create(directory):
  106. state = State(directory)
  107. #makedirs(state.get_directory_wrappers())
  108. #makedirs(state.get_directory_jobs())
  109. return state
  110. def __init__(self, directory):
  111. self.__directory = os.path.abspath(directory)
  112. self.content_prev = ''
  113. self.top = dict() # for serialization, when we decide we need it
  114. self.top['jobs'] = dict()
  115. #self.top['jobids_submitted'] = list()
  116. #self.top['jobid2exit'] = dict()
  117. self.jobids_threaded = set()
  118. self.jobids_submitted = set()
  119. self.jobid2exit = dict()
  120. class SafeState(object):
  121. """Synchronized State proxy for accessing any
  122. data which might be modified in a Thread.
  123. """
  124. def notify_threaded(self, jobid):
  125. with self.lock:
  126. self.state.notify_threaded(jobid)
  127. def notify_started(self, jobid):
  128. with self.lock:
  129. self.state.notify_started(jobid)
  130. def notify_exited(self, jobid, rc):
  131. with self.lock:
  132. self.state.notify_exited(jobid, rc)
  133. def update_jobid2status(self, table):
  134. with self.lock:
  135. return self.state.update_jobid2status(table)
  136. def get_running_jobids(self):
  137. with self.lock:
  138. return self.state.get_running_jobids()
  139. def serialize(self):
  140. with self.lock:
  141. return self.state.serialize()
  142. def __getattr__(self, name):
  143. """For all other methods, just delegate.
  144. """
  145. return getattr(self.state, name)
  146. def __init__(self, state):
  147. self.state = state
  148. self.lock = threading.Lock()
  149. def get_state(directory):
  150. """For now, we never write.
  151. """
  152. state_fn = os.path.join(directory, STATE_FN)
  153. if not os.path.exists(state_fn):
  154. return State.create(directory)
  155. assert False, 'No state directory needed, for now.'
  156. try:
  157. return State.deserialize(directory, open(state_fn).read())
  158. except Exception:
  159. log.exception('Failed to read state "%s". Ignoring (and soon over-writing) current state.'%state_fn)
  160. # TODO: Backup previous STATE_FN?
  161. return State(directory)
  162. def State_save(state):
  163. # TODO: RW Locks, maybe for runtime of whole program.
  164. content = state.serialize()
  165. content_prev = state.content_prev
  166. if content == content_prev:
  167. return
  168. fn = state.get_state_fn()
  169. open(fn, 'w').write(content)
  170. log.debug('saved state to %s' %repr(os.path.abspath(fn)))
  171. def Job_get_MetaJob(job, lang_exe=lang_bash_exe):
  172. return MetaJob(job, lang_exe=lang_exe)
  173. def MetaJob_wrap(mjob, state):
  174. """Write wrapper contents to mjob.wrapper.
  175. """
  176. metajob_rundir = mjob.job.rundir
  177. wdir = metajob_rundir
  178. bash_template = """#!%(lang_exe)s
  179. cmd="%(cmd)s"
  180. rundir="%(rundir)s"
  181. finish() {
  182. echo "finish code: $?"
  183. }
  184. trap finish 0
  185. #printenv
  186. echo
  187. set -ex
  188. while [ ! -d "$rundir" ]; do sleep 1; done
  189. cd "$rundir"
  190. eval "$cmd"
  191. """
  192. mji = MetaJobClass(mjob)
  193. wrapper_fn = os.path.join(wdir, mji.get_wrapper())
  194. command = mjob.job.cmd
  195. wrapped = bash_template %dict(
  196. lang_exe=mjob.lang_exe,
  197. cmd=command,
  198. rundir=metajob_rundir,
  199. )
  200. log.debug('Writing wrapper "%s"' %wrapper_fn)
  201. open(wrapper_fn, 'w').write(wrapped)
  202. st = os.stat(wrapper_fn)
  203. os.chmod(wrapper_fn, st.st_mode | 0o111)
  204. class JobThread(threading.Thread):
  205. def run(self):
  206. """Propagate environment, plus env_extra.
  207. """
  208. try:
  209. self.notify_start(self.jobname)
  210. log.debug('====>hello! started Thread {}'.format(threading.current_thread()))
  211. myenv = dict(os.environ)
  212. myenv.update(self.env_extra)
  213. #log.debug('myenv:\n{}'.format(pprint.pformat(myenv)))
  214. log.info("====>Popen: '{}'".format(self.cmd))
  215. if not self.cmd:
  216. msg = 'Why is self.cmd empty? {} {} {!r}'.format(self, self.jobname, self.cmd)
  217. raise Exception(msg)
  218. p = subprocess.Popen(self.cmd, env=myenv, shell=True)
  219. log.debug("====>pid: {}".format(p.pid))
  220. p.wait()
  221. rc = p.returncode
  222. log.debug("====>rc: {}".format(rc))
  223. self.notify_exit(self.jobname, rc)
  224. except:
  225. log.exception('Failed to submit {}: {!r} Setting rc=42.'.format(self.jobname, self.cmd))
  226. self.notify_exit(self.jobname, 42)
  227. def __init__(self, jobname, cmd, notify_start, notify_exit, env_extra):
  228. super(JobThread, self).__init__()
  229. self.jobname = jobname
  230. self.cmd = cmd
  231. self.notify_start = notify_start
  232. self.notify_exit = notify_exit
  233. self.env_extra = env_extra
  234. class StringJobSubmitter(object):
  235. """Substitute some variables into self.submission_string.
  236. Use mains/job_start.sh as the top script. That requires
  237. PYPEFLOW_JOB_START_SCRIPT in the environment as the real
  238. script to run. This way, we are guaranteed that the top script exists,
  239. and we can wait for the rest to appear in the filesystem.
  240. """
  241. def submit(self, jobname, mjob, state):
  242. """Prepare job (based on wrappers) and submit as a new thread.
  243. """
  244. state.set_job(jobname, mjob)
  245. jobname = mjob.job.jobid
  246. job_dict = mjob.job.options
  247. #nproc = mjob.job.options['NPROC']
  248. #mb = mjob.job.options['MB']
  249. mji = MetaJobClass(mjob)
  250. #script_fn = os.path.join(state.get_directory_wrappers(), mji.get_wrapper())
  251. script_fn = mji.get_wrapper()
  252. exe = mjob.lang_exe
  253. state.notify_threaded(jobname)
  254. self.start(jobname, state, exe, script_fn, job_dict) # Can raise
  255. def get_cmd(self, job_name, script_fn, job_dict):
  256. """Vars:
  257. (The old ones.) JOB_ID, STDOUT_FILE, STDERR_FILE, NPROC, MB, CMD
  258. """
  259. # We wrap in a program that waits for the executable to exist, so
  260. # the filesystem has time to catch up on the remote machine.
  261. # Hopefully, this will allow dependencies to become ready as well.
  262. job_start_fn = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'mains/job_start.sh')
  263. mapping = dict()
  264. stdout = script_fn + '.stdout'
  265. stderr = script_fn + '.stderr'
  266. run_dir = os.getcwd()
  267. mapping = dict(
  268. JOB_EXE='/bin/bash',
  269. JOB_NAME=job_name, JOB_ID=job_name,
  270. #JOB_OPTS=JOB_OPTS,
  271. #JOB_QUEUE=job_queue,
  272. JOB_SCRIPT=job_start_fn, CMD=job_start_fn,
  273. JOB_DIR=run_dir, DIR=run_dir,
  274. JOB_STDOUT=stdout, STDOUT_FILE=stdout,
  275. JOB_STDERR=stderr, STDERR_FILE=stderr,
  276. #MB=pypeflow_mb,
  277. #NPROC=pypeflow_nproc,
  278. )
  279. mapping.update(job_dict)
  280. if 'JOB_OPTS' in mapping:
  281. # a special two-level mapping: ${JOB_OPTS} is substituted first
  282. mapping['JOB_OPTS'] = self.sub(mapping['JOB_OPTS'], mapping)
  283. return self.sub(self.submission_string, mapping)
  284. @staticmethod
  285. def sub(template, mapping):
  286. t = string.Template(template)
  287. try:
  288. return t.substitute(mapping)
  289. except KeyError:
  290. print(repr(mapping))
  291. msg = 'Template substitution failed:\n template={!r}\n mapping={}'.format(
  292. template, pprint.pformat(mapping))
  293. log.exception(msg)
  294. raise
  295. def start(self, jobname, state, exe, script_fn, job_dict):
  296. """Run job in thread.
  297. Thread will notify state.
  298. Can raise.
  299. """
  300. #cmd = script_fn
  301. cmd = self.get_cmd(jobname, script_fn, job_dict)
  302. # job_start.sh relies on PYPEFLOW_*
  303. env_extra = {
  304. "PYPEFLOW_JOB_START_SCRIPT": script_fn,
  305. "PYPEFLOW_JOB_START_TIMEOUT": "60",
  306. }
  307. log.debug('env_extra={}'.format(pprint.pformat(env_extra)))
  308. notify_start = state.notify_started
  309. notify_exit = state.notify_exited
  310. th = JobThread(jobname, cmd, notify_start, notify_exit, env_extra)
  311. #th.setDaemon(True)
  312. th.start()
  313. def __repr__(self):
  314. return 'StringJobSubmitter(%s)' %repr(self.submission_string)
  315. def __init__(self, submission_string):
  316. self.submission_string = submission_string
  317. def link_rundir(state_rundir, user_rundir):
  318. if user_rundir:
  319. link_fn = os.path.join(user_rundir, 'pwatcher.dir')
  320. if os.path.lexists(link_fn):
  321. os.unlink(link_fn)
  322. os.symlink(os.path.abspath(state_rundir), link_fn)
  323. def cmd_run(state, jobids, job_type, job_dict):
  324. """
  325. Wrap them and run them locally, each in the foreground of a thread.
  326. """
  327. jobs = dict()
  328. submitted = list()
  329. result = {'submitted': submitted}
  330. if job_type != 'string':
  331. log.debug("NOTE: In blocking pwatcher, job_type={!r}, should be 'string'".format(job_type))
  332. for jobid, desc in jobids.items():
  333. assert 'cmd' in desc
  334. cmd = desc['cmd']
  335. if 'rundir' in desc:
  336. rundir = desc['rundir']
  337. else:
  338. rundir = os.path.dirname(cmd)
  339. # These are all required now.
  340. #nproc = desc['job_nproc']
  341. #mb = desc['job_mb']
  342. local = int(desc['job_local'])
  343. options = copy.deepcopy(desc['job_dict']) #dict(NPROC=nproc, MB=mb, local=local)
  344. options['local'] = local
  345. jobs[jobid] = Job(jobid, cmd, rundir, options)
  346. log.debug('jobs:\n%s' %pprint.pformat(jobs))
  347. submission_string = job_dict['submit']
  348. basic_submitter = StringJobSubmitter(submission_string)
  349. local_submitter = StringJobSubmitter(LOCAL_SUBMISSION_STRING)
  350. log.debug('Basic submitter: {!r}'.format(basic_submitter))
  351. for jobid, job in jobs.items():
  352. #desc = jobids[jobid]
  353. log.debug(' starting job %s' %pprint.pformat(job))
  354. mjob = Job_get_MetaJob(job)
  355. MetaJob_wrap(mjob, state)
  356. try:
  357. #link_rundir(state.get_directory_job(jobid), desc.get('rundir'))
  358. if job.options['local']:
  359. submitter = local_submitter
  360. else:
  361. submitter = basic_submitter
  362. if not submission_string:
  363. raise Exception('No "submit" key in job_dict:{!r}.'.format(job_dict))
  364. submitter.submit(jobid, mjob, state)
  365. submitted.append(jobid)
  366. except Exception:
  367. raise
  368. log.exception('Failed to submit background-job:\n{!r}'.format(
  369. submitter))
  370. return result
  371. # The caller is responsible for deciding what to do about job-submission failures. Re-try, maybe?
  372. def system(call, checked=False):
  373. log.info('!{}'.format(call))
  374. rc = os.system(call)
  375. if checked and rc:
  376. raise Exception('{} <- {!r}'.format(rc, call))
  377. return rc
  378. _warned = dict()
  379. def warnonce(hashkey, msg):
  380. if hashkey in _warned:
  381. return
  382. log.warning(msg)
  383. _warned[hashkey] = True
  384. def cmd_query(state, which, jobids):
  385. """Return the state of named jobids.
  386. If which=='list', then query jobs listed as jobids.
  387. If which=='known', then query all known jobs.
  388. If which=='infer', same as 'known' now.
  389. """
  390. result = dict()
  391. jobstats = dict()
  392. result['jobids'] = jobstats
  393. if which == 'list':
  394. for jobid in jobids:
  395. jobstats[jobid] = 'UNKNOWN'
  396. state.update_jobid2status(jobstats)
  397. jobids = set(jobids)
  398. if which == 'list':
  399. for jobid in list(jobstats.keys()):
  400. # TODO: This might remove thousands. We should pass jobids along to update_jobid2status().
  401. if jobid not in jobids:
  402. del jobstats[jobid]
  403. return result
  404. def cmd_delete(state, which, jobids):
  405. """Kill designated jobs, including (hopefully) their
  406. entire process groups.
  407. If which=='list', then kill all jobs listed as jobids.
  408. If which=='known', then kill all known jobs.
  409. If which=='infer', then kill all jobs with heartbeats.
  410. """
  411. log.error('Noop. We cannot kill blocked threads. Hopefully, everything will die on SIGTERM.')
  412. def makedirs(path):
  413. if not os.path.isdir(path):
  414. os.makedirs(path)
  415. def readjson(ifs):
  416. """Del keys that start with ~.
  417. That lets us have trailing commas on all other lines.
  418. """
  419. content = ifs.read()
  420. log.debug('content:%s' %repr(content))
  421. jsonval = json.loads(content)
  422. #pprint.pprint(jsonval)
  423. def striptildes(subd):
  424. if not isinstance(subd, dict):
  425. return
  426. for k,v in list(subd.items()):
  427. if k.startswith('~'):
  428. del subd[k]
  429. else:
  430. striptildes(v)
  431. striptildes(jsonval)
  432. #pprint.pprint(jsonval)
  433. return jsonval
  434. class ProcessWatcher(object):
  435. def run(self, jobids, job_type, job_defaults_dict):
  436. #import traceback; log.debug(''.join(traceback.format_stack()))
  437. log.debug('run(jobids={}, job_type={}, job_defaults_dict={})'.format(
  438. '<%s>'%len(jobids), job_type, job_defaults_dict))
  439. return cmd_run(self.state, jobids, job_type, job_defaults_dict)
  440. def query(self, which='list', jobids=[]):
  441. log.debug('query(which={!r}, jobids={})'.format(
  442. which, '<%s>'%len(jobids)))
  443. return cmd_query(self.state, which, jobids)
  444. def delete(self, which='list', jobids=[]):
  445. log.debug('delete(which={!r}, jobids={})'.format(
  446. which, '<%s>'%len(jobids)))
  447. return cmd_delete(self.state, which, jobids)
  448. def __init__(self, state):
  449. # state must be thread-safe
  450. self.state = state
  451. def get_process_watcher(directory):
  452. state = get_state(directory)
  453. state = SafeState(state) # thread-safe proxy
  454. #log.debug('state =\n%s' %pprint.pformat(state.top))
  455. return ProcessWatcher(state)
  456. #State_save(state)
  457. @contextlib.contextmanager
  458. def process_watcher(directory):
  459. """This will (someday) hold a lock, so that
  460. the State can be written safely at the end.
  461. """
  462. state = get_state(directory)
  463. state = SafeState(state) # thread-safe proxy
  464. #log.debug('state =\n%s' %pprint.pformat(state.top))
  465. yield ProcessWatcher(state)
  466. #State_save(state)
  467. def main(prog, cmd, state_dir='mainpwatcher', argsfile=None):
  468. logging.basicConfig()
  469. logging.getLogger().setLevel(logging.NOTSET)
  470. log.warning('logging basically configured')
  471. log.debug('debug mode on')
  472. assert cmd in ['run', 'query', 'delete']
  473. ifs = sys.stdin if not argsfile else open(argsfile)
  474. argsdict = readjson(ifs)
  475. log.info('argsdict =\n%s' %pprint.pformat(argsdict))
  476. with process_watcher(state_dir) as watcher:
  477. result = getattr(watcher, cmd)(**argsdict)
  478. if result is not None:
  479. log.info('getattr({!r}, {!r}): {}'.format(
  480. watcher, cmd, pprint.pformat(result)))
  481. log.info('Waiting for running jobs...r')
  482. while watcher.state.get_running_jobids():
  483. log.info('running: {!s}'.format(watcher.state.get_running_jobids()))
  484. time.sleep(1)
  485. if __name__ == "__main__":
  486. #import pdb
  487. #pdb.set_trace()
  488. main(*sys.argv) # pylint: disable=no-value-for-parameter