fs_based.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785
  1. """Filesytem-based process-watcher.
  2. This is meant to be part of a 2-process system. For now, let's call these processes the Definer and the Watcher.
  3. * The Definer creates a graph of tasks and starts a resolver loop, like pypeflow. It keeps a Waiting list, a Running list, and a Done list. It then communicates with the Watcher.
  4. * The Watcher has 3 basic functions in its API.
  5. 1. Spawn jobs.
  6. 2. Kill jobs.
  7. 3. Query jobs.
  8. 1. Spawning jobs
  9. The job definition includes the script, how to run it (locally, qsub, etc.), and maybe some details (unique-id, run-directory). The Watcher then:
  10. * wraps the script without something to update a heartbeat-file periodically,
  11. * spawns each job (possibly as a background process locally),
  12. * and records info (including PID or qsub-name) in a persistent database.
  13. 2. Kill jobs.
  14. Since it has a persistent database, it can always kill any job, upon request.
  15. 3. Query jobs.
  16. Whenever requested, it can poll the filesystem for all or any jobs, returning the subset of completed jobs. (For NFS efficiency, all the job-exit sentinel files can be in the same directory, along with the heartbeats.)
  17. The Definer would call the Watcher to spawn tasks, and then periodically to poll them. Because these are both now single-threaded, the Watcher *could* be a function within the Definer, or a it could be blocking call to a separate process. With proper locking on the database, users could also query the same executable as a separate process.
  18. Caching/timestamp-checking would be done in the Definer, flexibly specific to each Task.
  19. Eventually, the Watcher could be in a different programming language. Maybe perl. (In bash, a background heartbeat gets is own process group, so it can be hard to clean up.)
  20. """
  21. try:
  22. from shlex import quote
  23. except ImportError:
  24. from pipes import quote
  25. import collections
  26. import contextlib
  27. import copy
  28. import glob
  29. import json
  30. import logging
  31. import os
  32. import pprint
  33. import re
  34. import signal
  35. import string
  36. import subprocess
  37. import sys
  38. import time
  39. import traceback
  40. from pypeflow.io import capture, syscall
  41. log = logging.getLogger(__name__)
  42. HEARTBEAT_RATE_S = 10.0
  43. ALLOWED_SKEW_S = 120.0
  44. STATE_FN = 'state.py'
  45. Job = collections.namedtuple('Job', ['jobid', 'cmd', 'rundir', 'options'])
  46. MetaJob = collections.namedtuple('MetaJob', ['job', 'lang_exe'])
  47. lang_python_exe = sys.executable
  48. lang_bash_exe = '/bin/bash'
  49. @contextlib.contextmanager
  50. def cd(newdir):
  51. prevdir = os.getcwd()
  52. log.debug('CD: %r <- %r' %(newdir, prevdir))
  53. os.chdir(os.path.expanduser(newdir))
  54. try:
  55. yield
  56. finally:
  57. log.debug('CD: %r -> %r' %(newdir, prevdir))
  58. os.chdir(prevdir)
  59. class MetaJobClass(object):
  60. ext = {
  61. lang_python_exe: '.py',
  62. lang_bash_exe: '.bash',
  63. }
  64. def get_wrapper(self):
  65. return 'run-%s%s' %(self.mj.job.jobid, self.ext[self.mj.lang_exe])
  66. def get_sentinel(self):
  67. return 'exit-%s' %self.mj.job.jobid # in watched dir
  68. def get_heartbeat(self):
  69. return 'heartbeat-%s' %self.mj.job.jobid # in watched dir
  70. def get_pid(self):
  71. return self.mj.pid
  72. def kill(self, pid, sig):
  73. stored_pid = self.get_pid()
  74. if not pid:
  75. pid = stored_pid
  76. log.info('Not passed a pid to kill. Using stored pid:%s' %pid)
  77. if pid and stored_pid:
  78. if pid != stored_pid:
  79. log.error('pid:%s != stored_pid:%s' %(pid, stored_pid))
  80. os.kill(pid, sig)
  81. def __init__(self, mj):
  82. self.mj = mj
  83. class State(object):
  84. def get_state_fn(self):
  85. return os.path.join(self.__directory, STATE_FN)
  86. def get_directory(self):
  87. return self.__directory
  88. def get_directory_wrappers(self):
  89. return os.path.join(self.__directory, 'wrappers')
  90. def get_directory_heartbeats(self):
  91. return os.path.join(self.__directory, 'heartbeats')
  92. def get_directory_exits(self):
  93. return os.path.join(self.__directory, 'exits')
  94. def get_directory_jobs(self):
  95. # B/c the other directories can get big, we put most per-job data here, under each jobid.
  96. return os.path.join(self.__directory, 'jobs')
  97. def get_directory_job(self, jobid):
  98. return os.path.join(self.get_directory_jobs(), jobid)
  99. def submit_background(self, bjob):
  100. """Run job in background.
  101. Record in state.
  102. """
  103. self.top['jobs'][bjob.mjob.job.jobid] = bjob
  104. jobid = bjob.mjob.job.jobid
  105. mji = MetaJobClass(bjob.mjob)
  106. script_fn = os.path.join(self.get_directory_wrappers(), mji.get_wrapper())
  107. exe = bjob.mjob.lang_exe
  108. run_dir = self.get_directory_job(jobid)
  109. makedirs(run_dir)
  110. with cd(run_dir):
  111. bjob.submit(self, exe, script_fn) # Can raise
  112. log.info('Submitted backgroundjob=%s'%repr(bjob))
  113. self.top['jobids_submitted'].append(jobid)
  114. def get_mji(self, jobid):
  115. mjob = self.top['jobs'][jobid].mjob
  116. return MetaJobClass(mjob)
  117. def get_bjob(self, jobid):
  118. return self.top['jobs'][jobid]
  119. def get_bjobs(self):
  120. return self.top['jobs']
  121. def get_mjobs(self):
  122. return {jobid: bjob.mjob for jobid, bjob in self.top['jobs'].items()}
  123. def add_deleted_jobid(self, jobid):
  124. self.top['jobids_deleted'].append(jobid)
  125. def serialize(self):
  126. return pprint.pformat(self.top)
  127. @staticmethod
  128. def deserialize(directory, content):
  129. state = State(directory)
  130. state.top = eval(content)
  131. state.content_prev = content
  132. return state
  133. @staticmethod
  134. def create(directory):
  135. state = State(directory)
  136. makedirs(state.get_directory_wrappers())
  137. makedirs(state.get_directory_heartbeats())
  138. makedirs(state.get_directory_exits())
  139. #system('lfs setstripe -c 1 {}'.format(state.get_directory_heartbeats())) # no improvement noticed
  140. makedirs(state.get_directory_jobs())
  141. return state
  142. def __init__(self, directory):
  143. self.__directory = os.path.abspath(directory)
  144. self.content_prev = ''
  145. self.top = dict()
  146. self.top['jobs'] = dict()
  147. self.top['jobids_deleted'] = list()
  148. self.top['jobids_submitted'] = list()
  149. def get_state(directory):
  150. state_fn = os.path.join(directory, STATE_FN)
  151. if not os.path.exists(state_fn):
  152. return State.create(directory)
  153. try:
  154. return State.deserialize(directory, open(state_fn).read())
  155. except Exception:
  156. log.exception('Failed to read state "%s". Ignoring (and soon over-writing) current state.'%state_fn)
  157. # TODO: Backup previous STATE_FN?
  158. return State(directory)
  159. def State_save(state):
  160. # TODO: RW Locks, maybe for runtime of whole program.
  161. content = state.serialize()
  162. content_prev = state.content_prev
  163. if content == content_prev:
  164. return
  165. fn = state.get_state_fn()
  166. open(fn, 'w').write(content)
  167. log.debug('saved state to %s' %repr(os.path.abspath(fn)))
  168. def Job_get_MetaJob(job, lang_exe=lang_bash_exe):
  169. return MetaJob(job, lang_exe=lang_exe)
  170. def MetaJob_wrap(mjob, state):
  171. """Write wrapper contents to mjob.wrapper.
  172. """
  173. wdir = state.get_directory_wrappers()
  174. hdir = state.get_directory_heartbeats()
  175. edir = state.get_directory_exits()
  176. metajob_rundir = mjob.job.rundir
  177. bash_template = """#!%(lang_exe)s
  178. printenv
  179. echo
  180. set -x
  181. %(cmd)s
  182. """
  183. # We do not bother with 'set -e' here because this script is run either
  184. # in the background or via qsub.
  185. templates = {
  186. lang_python_exe: python_template,
  187. lang_bash_exe: bash_template,
  188. }
  189. mji = MetaJobClass(mjob)
  190. wrapper_fn = os.path.join(wdir, mji.get_wrapper())
  191. exit_sentinel_fn=os.path.join(edir, mji.get_sentinel())
  192. heartbeat_fn=os.path.join(hdir, mji.get_heartbeat())
  193. rate = HEARTBEAT_RATE_S
  194. command = mjob.job.cmd
  195. prog = 'heartbeat-wrapper' # missing in mobs
  196. prog = 'python3 -m pwatcher.mains.fs_heartbeat'
  197. heartbeat_wrapper_template = "{prog} --directory={metajob_rundir} --heartbeat-file={heartbeat_fn} --exit-file={exit_sentinel_fn} --rate={rate} {command} || echo 99 >| {exit_sentinel_fn}"
  198. # We write 99 into exit-sentinel if the wrapper fails.
  199. wrapped = heartbeat_wrapper_template.format(**locals())
  200. log.debug('Wrapped "%s"' %wrapped)
  201. wrapped = templates[mjob.lang_exe] %dict(
  202. lang_exe=mjob.lang_exe,
  203. cmd=wrapped,
  204. )
  205. log.debug('Writing wrapper "%s"' %wrapper_fn)
  206. open(wrapper_fn, 'w').write(wrapped)
  207. def background(script, exe='/bin/bash'):
  208. """Start script in background (so it keeps going when we exit).
  209. Run in cwd.
  210. For now, stdout/stderr are captured.
  211. Return pid.
  212. """
  213. args = [exe, script]
  214. sin = open(os.devnull)
  215. sout = open('stdout', 'w')
  216. serr = open('stderr', 'w')
  217. pseudo_call = '{exe} {script} 1>|stdout 2>|stderr & '.format(exe=exe, script=script)
  218. log.info('dir: {!r}\nCALL:\n {!r}'.format(os.getcwd(), pseudo_call))
  219. proc = subprocess.Popen([exe, script], stdin=sin, stdout=sout, stderr=serr)
  220. pid = proc.pid
  221. log.info('pid=%s pgid=%s sub-pid=%s' %(os.getpid(), os.getpgid(0), proc.pid))
  222. #checkcall = 'ls -l /proc/{}/cwd'.format(
  223. # proc.pid)
  224. #system(checkcall, checked=True)
  225. return pid
  226. def qstripped(option, flag='-q'):
  227. """Given a string of options, remove any -q foo.
  228. (No longer used.)
  229. >>> qstripped('-xy -q foo -z bar')
  230. '-xy -z bar'
  231. >>> qstripped('-xy -p foo -z bar', '-p')
  232. '-xy -z bar'
  233. """
  234. # For now, do not strip -qfoo
  235. vals = option.strip().split()
  236. while flag in vals:
  237. i = vals.index(flag)
  238. vals = vals[0:i] + vals[i+2:]
  239. return ' '.join(vals)
  240. class MetaJobLocal(object):
  241. """For jobs on the local machine, with process-watching.
  242. We cannot simply run with '&' because then we would not know how
  243. to kill the new background job.
  244. """
  245. def submit(self, state, exe, script_fn):
  246. """Can raise.
  247. """
  248. pid = background(script_fn, exe=self.mjob.lang_exe)
  249. def kill(self, state, heartbeat):
  250. """Can raise.
  251. (Actually, we could derive heartbeat from state. But for now, we know it anyway.)
  252. """
  253. hdir = state.get_directory_heartbeats()
  254. heartbeat_fn = os.path.join(hdir, heartbeat)
  255. with open(heartbeat_fn) as ifs:
  256. line = ifs.readline()
  257. pid = line.split()[1]
  258. pid = int(pid)
  259. pgid = line.split()[2]
  260. pgid = int(pgid)
  261. sig =signal.SIGKILL
  262. log.info('Sending signal(%s) to pgid=-%s (pid=%s) based on heartbeat=%r' %(sig, pgid, pid, heartbeat))
  263. try:
  264. os.kill(-pgid, sig)
  265. except Exception:
  266. log.exception('Failed to kill(%s) pgid=-%s for %r. Trying pid=%s' %(sig, pgid, heartbeat_fn, pid))
  267. os.kill(pid, sig)
  268. def __repr__(self):
  269. return 'MetaJobLocal(%s)' %repr(self.mjob)
  270. def __init__(self, mjob):
  271. self.mjob = mjob # PUBLIC
  272. class MetaJobSubmit(object):
  273. """Generic job-submission, non-blocking.
  274. Add shebang to script.
  275. If running locally, then caller must append '&' onto job_submit to put job in background.
  276. """
  277. def submit(self, state, exe, script_fn):
  278. """Run in cwd, in background.
  279. Can raise.
  280. """
  281. run_dir = os.getcwd()
  282. job_name = self.get_job_name()
  283. #job_nproc = self.job_nproc
  284. #job_mb = self.job_mb
  285. #job_queue = self.job_queue
  286. # Add shebang, in case shell_start_mode=unix_behavior (for SGE).
  287. # https://github.com/PacificBiosciences/FALCON/pull/348
  288. with open(script_fn, 'r') as original: data = original.read()
  289. with open(script_fn, 'w') as modified: modified.write("#!/bin/bash" + "\n" + data)
  290. mapping = dict(
  291. JOB_EXE='/bin/bash',
  292. JOB_NAME=job_name,
  293. #JOB_OPTS=JOB_OPTS,
  294. #JOB_QUEUE=job_queue,
  295. JOB_SCRIPT=script_fn, CMD=script_fn,
  296. JOB_DIR=run_dir, DIR=run_dir,
  297. JOB_STDOUT='stdout', STDOUT_FILE='stdout',
  298. JOB_STDERR='stderr', STDERR_FILE='stderr',
  299. #MB=pypeflow_mb,
  300. #NPROC=pypeflow_nproc,
  301. )
  302. mapping.update(self.job_dict)
  303. if 'JOB_OPTS' in mapping:
  304. # a special two-level mapping: ${JOB_OPTS} is substituted first
  305. mapping['JOB_OPTS'] = self.sub(mapping['JOB_OPTS'], mapping)
  306. sge_cmd = self.sub(self.submit_template, mapping)
  307. self.submit_capture = capture(sge_cmd)
  308. def kill(self, state, heartbeat=None):
  309. """Can raise.
  310. """
  311. #hdir = state.get_directory_heartbeats()
  312. #heartbeat_fn = os.path.join(hdir, heartbeat)
  313. #jobid = self.mjob.job.jobid
  314. job_name = self.get_job_name()
  315. job_num = self.get_job_num()
  316. mapping = dict(
  317. JOB_NAME=job_name,
  318. JOB_NUM=job_name,
  319. )
  320. mapping.update(self.job_dict)
  321. sge_cmd = self.sub(self.kill_template, mapping)
  322. system(sge_cmd, checked=False)
  323. def sub(self, unsub, mapping):
  324. return string.Template(unsub).substitute(mapping)
  325. def get_job_name(self):
  326. """Some systems are limited to 15 characters, but we expect that to be truncated by the caller.
  327. TODO: Choose a sequential jobname and record it. Priority: low, since collisions are very unlikely.
  328. """
  329. # jobid is an overloaded term in the pbsmrtpipe world, so we use job_name here.
  330. return self.mjob.job.jobid
  331. def get_job_num(self):
  332. """For now, just the jobname.
  333. """
  334. return self.mjob.job.jobid
  335. def __repr__(self):
  336. return '{}({!r})'.format(self.__class__.__name__, self.mjob)
  337. def __init__(self, mjob):
  338. self.mjob = mjob
  339. if not hasattr(self, 'JOB_OPTS'):
  340. self.JOB_OPTS = None # unreachable, since this is an abstract class
  341. self.job_dict = copy.deepcopy(self.mjob.job.options)
  342. jd = self.job_dict
  343. if 'submit' in jd:
  344. self.submit_template = jd['submit']
  345. if 'kill' in jd:
  346. self.kill_template = jd['kill']
  347. if 'JOB_OPTS' not in jd and hasattr(self, 'JOB_OPTS'):
  348. jd['JOB_OPTS'] = self.JOB_OPTS
  349. assert self.submit_template
  350. assert self.kill_template
  351. assert self.JOB_OPTS
  352. class MetaJobSge(MetaJobSubmit):
  353. def __init__(self, mjob):
  354. # '-V' => pass enV; '-j y' => combine out/err
  355. self.submit_template = 'qsub -V -N ${JOB_NAME} ${JOB_OPTS} -cwd -o ${JOB_STDOUT} -e ${JOB_STDERR} -S /bin/bash ${JOB_SCRIPT}'
  356. self.JOB_OPTS = '-q ${JOB_QUEUE} -pe smp ${NPROC}' # -l h_vmem=${MB}M does not work within PacBio
  357. self.kill_template = 'qdel ${JOB_NAME}'
  358. super(MetaJobSge, self).__init__(mjob)
  359. class MetaJobPbs(MetaJobSubmit):
  360. """
  361. usage: qsub [-a date_time] [-A account_string] [-c interval]
  362. [-C directive_prefix] [-e path] [-h ] [-I [-X]] [-j oe|eo] [-J X-Y[:Z]]
  363. [-k o|e|oe] [-l resource_list] [-m mail_options] [-M user_list]
  364. [-N jobname] [-o path] [-p priority] [-q queue] [-r y|n]
  365. [-S path] [-u user_list] [-W otherattributes=value...]
  366. [-v variable_list] [-V ] [-z] [script | -- command [arg1 ...]]
  367. """
  368. def get_job_num(self):
  369. """Really an Id, not a number, but JOB_ID was used for something else.
  370. See: https://github.com/PacificBiosciences/pypeFLOW/issues/54
  371. """
  372. cap = self.submit_capture
  373. try:
  374. re_cap = re.compile(r'\S+')
  375. mo = re_cap.search(cap)
  376. return mo.group(0)
  377. except Exception:
  378. log.exception('For PBS, failed to parse submit_capture={!r}\n Using job_name instead.'.format(cap))
  379. return self.mjob.job.jobid
  380. def __init__(self, mjob):
  381. self.submit_template = 'qsub -V -N ${JOB_NAME} ${JOB_OPTS} -o ${JOB_STDOUT} -e ${JOB_STDERR} -S /bin/bash ${JOB_SCRIPT}'
  382. self.JOB_OPTS = '-q ${JOB_QUEUE} --cpus-per-task=${NPROC} --mem-per-cpu=${MB}M'
  383. self.kill_template = 'qdel ${JOB_NAME}'
  384. super(MetaJobPbs, self).__init__(mjob)
  385. class MetaJobTorque(MetaJobSubmit):
  386. # http://docs.adaptivecomputing.com/torque/4-0-2/help.htm#topics/commands/qsub.htm
  387. def __init__(self, mjob):
  388. self.submit_template = 'qsub -V -N ${JOB_NAME} ${JOB_OPTS} -d ${JOB_DIR} -o ${JOB_STDOUT} -e ${JOB_STDERR} -S /bin/bash ${JOB_SCRIPT}'
  389. self.JOB_OPTS = '-q ${JOB_QUEUE} -l procs=${NPROC}'
  390. self.kill_template = 'qdel ${JOB_NUM}'
  391. super(MetaJobTorque, self).__init__(mjob)
  392. class MetaJobSlurm(MetaJobSubmit):
  393. def __init__(self, mjob):
  394. self.submit_template = 'sbatch -J ${JOB_NAME} ${JOB_OPTS} -D ${JOB_DIR} -o ${JOB_STDOUT} -e ${JOB_STDERR} --wrap="/bin/bash ${JOB_SCRIPT}"'
  395. self.JOB_OPTS = '-p ${JOB_QUEUE} --mincpus=${NPROC} --mem-per-cpu=${MB}'
  396. self.kill_template = 'scancel -n ${JOB_NUM}'
  397. super(MetaJobSlurm, self).__init__(mjob)
  398. class MetaJobLsf(MetaJobSubmit):
  399. def __init__(self, mjob):
  400. self.submit_template = 'bsub -J ${JOB_NAME} ${JOB_OPTS} -o ${JOB_STDOUT} -e ${JOB_STDERR} "/bin/bash ${JOB_SCRIPT}"'
  401. # "Sets the user's execution environment for the job, including the current working directory, file creation mask, and all environment variables, and sets LSF environment variables before starting the job."
  402. self.JOB_OPTS = '-q ${JOB_QUEUE} -n ${NPROC}'
  403. self.kill_template = 'bkill -J ${JOB_NUM}'
  404. super(MetaJobLsf, self).__init__(mjob)
  405. def link_rundir(state_rundir, user_rundir):
  406. if user_rundir:
  407. link_fn = os.path.join(user_rundir, 'pwatcher.dir')
  408. if os.path.lexists(link_fn):
  409. os.unlink(link_fn)
  410. os.symlink(os.path.abspath(state_rundir), link_fn)
  411. def cmd_run(state, jobids, job_type, job_defaults_dict):
  412. """On stdin, each line is a unique job-id, followed by run-dir, followed by command+args.
  413. Wrap them and run them locally, in the background.
  414. """
  415. # We don't really need job_defaults_dict as they were already
  416. # added to job_dict for each job.
  417. jobs = dict()
  418. submitted = list()
  419. result = {'submitted': submitted}
  420. for jobid, desc in jobids.items():
  421. options = copy.deepcopy(desc['job_dict']) # defaults were already applied here
  422. if not options.get('job_type'):
  423. options['job_type'] = job_type
  424. if int(desc['job_local']):
  425. options['job_type'] = 'local'
  426. jobs[jobid] = Job(jobid, desc['cmd'], desc['rundir'], options)
  427. log.debug('jobs:\n{}'.format(pprint.pformat(jobs)))
  428. for jobid, job in jobs.items():
  429. desc = jobids[jobid]
  430. mjob = Job_get_MetaJob(job)
  431. MetaJob_wrap(mjob, state)
  432. options = job.options
  433. my_job_type = job.options['job_type']
  434. if my_job_type is None:
  435. my_job_type = job_type
  436. my_job_type = my_job_type.upper()
  437. log.info(' starting job {} w/ job_type={}'.format(pprint.pformat(job), my_job_type))
  438. if my_job_type == 'LOCAL':
  439. bjob = MetaJobLocal(mjob)
  440. elif my_job_type == 'SGE':
  441. bjob = MetaJobSge(mjob)
  442. elif my_job_type == 'PBS':
  443. bjob = MetaJobPbs(mjob)
  444. elif my_job_type == 'TORQUE':
  445. bjob = MetaJobTorque(mjob)
  446. elif my_job_type == 'SLURM':
  447. bjob = MetaJobSlurm(mjob)
  448. elif my_job_type == 'LSF':
  449. bjob = MetaJobLsf(mjob)
  450. else:
  451. raise Exception('Unknown my_job_type=%s' %repr(my_job_type))
  452. try:
  453. link_rundir(state.get_directory_job(jobid), desc.get('rundir'))
  454. state.submit_background(bjob)
  455. submitted.append(jobid)
  456. except Exception:
  457. log.exception('In pwatcher.fs_based.cmd_run(), failed to submit background-job:\n{!r}'.format(
  458. bjob))
  459. #raise
  460. return result
  461. # The caller is responsible for deciding what to do about job-submission failures. Re-try, maybe?
  462. re_heartbeat = re.compile(r'heartbeat-(.+)')
  463. def get_jobid_for_heartbeat(heartbeat):
  464. """This cannot fail unless we change the filename format.
  465. """
  466. mo = re_heartbeat.search(heartbeat)
  467. jobid = mo.group(1)
  468. return jobid
  469. def system(call, checked=False):
  470. log.info('CALL:\n {}'.format(call))
  471. rc = os.system(call)
  472. if checked and rc:
  473. raise Exception('{} <- {!r}'.format(rc, call))
  474. _warned = dict()
  475. def warnonce(hashkey, msg):
  476. if hashkey in _warned:
  477. return
  478. log.warning(msg)
  479. _warned[hashkey] = True
  480. def get_status(state, elistdir, reference_s, sentinel, heartbeat):
  481. heartbeat_path = os.path.join(state.get_directory_heartbeats(), heartbeat)
  482. # We take listdir so we can avoid extra system calls.
  483. if sentinel in elistdir:
  484. try:
  485. pass
  486. #os.remove(heartbeat_path) # Note: We no longer use the heartbeats.
  487. except Exception:
  488. log.debug('Unable to remove heartbeat {} when sentinel was found in exit-sentinels listdir.\n{}'.format(
  489. repr(heartbeat_path), traceback.format_exc()))
  490. sentinel_path = os.path.join(state.get_directory_exits(), sentinel)
  491. with open(sentinel_path) as ifs:
  492. rc = ifs.read().strip()
  493. return 'EXIT {}'.format(rc)
  494. ## TODO: Record last stat times, to avoid extra stat if too frequent.
  495. #try:
  496. # mtime_s = os.path.getmtime(heartbeat_path)
  497. # if (mtime_s + 3*HEARTBEAT_RATE_S) < reference_s:
  498. # if (ALLOWED_SKEW_S + mtime_s + 3*HEARTBEAT_RATE_S) < reference_s:
  499. # msg = 'DEAD job? {} + 3*{} + {} < {} for {!r}'.format(
  500. # mtime_s, HEARTBEAT_RATE_S, ALLOWED_SKEW_S, reference_s, heartbeat_path)
  501. # log.debug(msg)
  502. # warnonce(heartbeat_path, msg)
  503. # return 'DEAD'
  504. # else:
  505. # log.debug('{} + 3*{} < {} for {!r}. You might have a large clock-skew, or filesystem delays, or just filesystem time-rounding.'.format(
  506. # mtime_s, HEARTBEAT_RATE_S, reference_s, heartbeat_path))
  507. #except Exception as exc:
  508. # # Probably, somebody deleted it after our call to os.listdir().
  509. # # TODO: Decide what this really means.
  510. # log.debug('Heartbeat not (yet?) found at %r: %r' %(heartbeat_path, exc))
  511. # return 'UNKNOWN'
  512. return 'RUNNING' # but actually it might not have started yet, or it could be dead, since we are not checking the heartbeat
  513. def cmd_query(state, which, jobids):
  514. """Return the state of named jobids.
  515. See find_jobids().
  516. """
  517. found = dict()
  518. edir = state.get_directory_exits()
  519. for heartbeat in find_heartbeats(state, which, jobids):
  520. jobid = get_jobid_for_heartbeat(heartbeat)
  521. mji = state.get_mji(jobid)
  522. sentinel = mji.get_sentinel()
  523. #system('ls -l {}/{} {}/{}'.format(edir, sentinel, hdir, heartbeat), checked=False)
  524. found[jobid] = (sentinel, heartbeat)
  525. elistdir = os.listdir(edir)
  526. current_time_s = time.time()
  527. result = dict()
  528. jobstats = dict()
  529. result['jobids'] = jobstats
  530. for jobid, pair in found.items():
  531. sentinel, heartbeat = pair
  532. status = get_status(state, elistdir, current_time_s, sentinel, heartbeat)
  533. log.debug('Status %s for heartbeat:%s' %(status, heartbeat))
  534. jobstats[jobid] = status
  535. return result
  536. def get_jobid2pid(pid2mjob):
  537. result = dict()
  538. for pid, mjob in pid2mjob.items():
  539. jobid = mjob.job.jobid
  540. result[jobid] = pid
  541. return result
  542. def find_heartbeats(state, which, jobids):
  543. """Yield heartbeat filenames.
  544. If which=='list', then query jobs listed as jobids.
  545. If which=='known', then query all known jobs.
  546. If which=='infer', then query all jobs with heartbeats.
  547. These are not quite finished, but already useful.
  548. """
  549. #log.debug('find_heartbeats for which=%s, jobids=%s' %(which, pprint.pformat(jobids)))
  550. if which == 'infer':
  551. for fn in glob.glob(os.path.join(state.get_directory_heartbeats(), 'heartbeat*')):
  552. yield fn
  553. elif which == 'known':
  554. jobid2mjob = state.get_mjobs()
  555. for jobid, mjob in jobid2mjob.items():
  556. mji = MetaJobClass(mjob)
  557. yield mji.get_heartbeat()
  558. elif which == 'list':
  559. jobid2mjob = state.get_mjobs()
  560. #log.debug('jobid2mjob:\n%s' %pprint.pformat(jobid2mjob))
  561. for jobid in jobids:
  562. #log.debug('jobid=%s; jobids=%s' %(repr(jobid), repr(jobids)))
  563. #if jobid not in jobid2mjob:
  564. # log.info("jobid=%s is not known. Might have been deleted already." %jobid)
  565. mjob = jobid2mjob[jobid]
  566. mji = MetaJobClass(mjob)
  567. yield mji.get_heartbeat()
  568. else:
  569. raise Exception('which=%s'%repr(which))
  570. def delete_heartbeat(state, heartbeat, keep=False):
  571. """
  572. Kill the job with this heartbeat.
  573. (If there is no heartbeat, then the job is already gone.)
  574. Delete the entry from state and update its jobid.
  575. Remove the heartbeat file, unless 'keep'.
  576. """
  577. hdir = state.get_directory_heartbeats()
  578. heartbeat_fn = os.path.join(hdir, heartbeat)
  579. jobid = get_jobid_for_heartbeat(heartbeat)
  580. try:
  581. bjob = state.get_bjob(jobid)
  582. except Exception:
  583. log.exception('In delete_heartbeat(), unable to find batchjob for %s (from %s)' %(jobid, heartbeat))
  584. log.warning('Cannot delete. You might be able to delete this yourself if you examine the content of %s.' %heartbeat_fn)
  585. # TODO: Maybe provide a default grid type, so we can attempt to delete anyway?
  586. return
  587. try:
  588. bjob.kill(state, heartbeat)
  589. except Exception as exc:
  590. log.exception('Failed to kill job for heartbeat {!r} (which might mean it was already gone): {!r}'.format(
  591. heartbeat, exc))
  592. state.add_deleted_jobid(jobid)
  593. # For now, keep it in the 'jobs' table.
  594. try:
  595. os.remove(heartbeat_fn)
  596. log.debug('Removed heartbeat=%s' %repr(heartbeat))
  597. except OSError as exc:
  598. log.debug('Cannot remove heartbeat {!r}: {!r}'.format(heartbeat_fn, exc))
  599. # Note: If sentinel suddenly appeared, that means the job exited. The pwatcher might wrongly think
  600. # it was deleted, but its output might be available anyway.
  601. def cmd_delete(state, which, jobids):
  602. """Kill designated jobs, including (hopefully) their
  603. entire process groups.
  604. If which=='list', then kill all jobs listed as jobids.
  605. If which=='known', then kill all known jobs.
  606. If which=='infer', then kill all jobs with heartbeats.
  607. Remove those heartbeat files.
  608. """
  609. log.debug('Deleting jobs for jobids from %s (%s)' %(
  610. which, repr(jobids)))
  611. for heartbeat in find_heartbeats(state, which, jobids):
  612. delete_heartbeat(state, heartbeat)
  613. def makedirs(path):
  614. if not os.path.isdir(path):
  615. os.makedirs(path)
  616. def readjson(ifs):
  617. """Del keys that start with ~.
  618. That lets us have trailing commas on all other lines.
  619. """
  620. content = ifs.read()
  621. log.debug('content:%s' %repr(content))
  622. jsonval = json.loads(content)
  623. #pprint.pprint(jsonval)
  624. def striptildes(subd):
  625. if not isinstance(subd, dict):
  626. return
  627. for k,v in list(subd.items()):
  628. if k.startswith('~'):
  629. del subd[k]
  630. else:
  631. striptildes(v)
  632. striptildes(jsonval)
  633. #pprint.pprint(jsonval)
  634. return jsonval
  635. class ProcessWatcher(object):
  636. def run(self, jobids, job_type, job_defaults_dict):
  637. #import traceback; log.debug(''.join(traceback.format_stack()))
  638. log.debug('run(jobids={}, job_type={}, job_defaults_dict={})'.format(
  639. '<%s>'%len(jobids), job_type, job_defaults_dict))
  640. return cmd_run(self.state, jobids, job_type, job_defaults_dict)
  641. def query(self, which='list', jobids=[]):
  642. log.debug('query(which={!r}, jobids={})'.format(
  643. which, '<%s>'%len(jobids)))
  644. return cmd_query(self.state, which, jobids)
  645. def delete(self, which='list', jobids=[]):
  646. log.debug('delete(which={!r}, jobids={})'.format(
  647. which, '<%s>'%len(jobids)))
  648. return cmd_delete(self.state, which, jobids)
  649. def __init__(self, state):
  650. self.state = state
  651. def get_process_watcher(directory):
  652. state = get_state(directory)
  653. #log.debug('state =\n%s' %pprint.pformat(state.top))
  654. return ProcessWatcher(state)
  655. #State_save(state)
  656. @contextlib.contextmanager
  657. def process_watcher(directory):
  658. """This will (someday) hold a lock, so that
  659. the State can be written safely at the end.
  660. """
  661. state = get_state(directory)
  662. #log.debug('state =\n%s' %pprint.pformat(state.top))
  663. yield ProcessWatcher(state)
  664. # TODO: Sometimes, maybe we should not save state.
  665. # Or maybe we *should* on exception.
  666. State_save(state)
  667. def main(prog, cmd, state_dir='mainpwatcher', argsfile=None):
  668. logging.basicConfig()
  669. logging.getLogger().setLevel(logging.NOTSET)
  670. log.warning('logging basically configured')
  671. log.debug('debug mode on')
  672. assert cmd in ['run', 'query', 'delete']
  673. ifs = sys.stdin if not argsfile else open(argsfile)
  674. argsdict = readjson(ifs)
  675. log.info('argsdict =\n%s' %pprint.pformat(argsdict))
  676. with process_watcher(state_dir) as watcher:
  677. result = getattr(watcher, cmd)(**argsdict)
  678. if result is not None:
  679. print(pprint.pformat(result))
  680. # With bash, we would need to set the session, rather than
  681. # the process group. That's not ideal, but this is here for reference.
  682. # http://stackoverflow.com/questions/6549663/how-to-set-process-group-of-a-shell-script
  683. #
  684. bash_template = """#!%(lang_exe)s
  685. cmd='%(cmd)s'
  686. "$cmd"
  687. """
  688. # perl might be better, for efficiency.
  689. # But we will use python for now.
  690. #
  691. python_template = r"""#!%(lang_exe)s
  692. import threading, time, os, sys
  693. cmd='%(cmd)s'
  694. sentinel_fn='%(sentinel_fn)s'
  695. heartbeat_fn='%(heartbeat_fn)s'
  696. sleep_s=%(sleep_s)s
  697. cwd='%(cwd)s'
  698. os.chdir(cwd)
  699. def log(msg):
  700. sys.stderr.write(msg)
  701. sys.stderr.write('\n')
  702. #sys.stdout.flush()
  703. def thread_heartbeat():
  704. ofs = open(heartbeat_fn, 'w')
  705. pid = os.getpid()
  706. pgid = os.getpgid(0)
  707. x = 0
  708. while True:
  709. ofs.write('{} {} {}\n'.format(x, pid, pgid))
  710. ofs.flush()
  711. time.sleep(sleep_s)
  712. x += 1
  713. def start_heartbeat():
  714. hb = threading.Thread(target=thread_heartbeat)
  715. log('alive? {}'.format(hb.is_alive()))
  716. hb.daemon = True
  717. hb.start()
  718. return hb
  719. def main():
  720. log('cwd:{!r}'.format(os.getcwd()))
  721. if os.path.exists(sentinel_fn):
  722. os.remove(sentinel_fn)
  723. if os.path.exists(heartbeat_fn):
  724. os.remove(heartbeat_fn)
  725. os.system('touch {}'.format(heartbeat_fn))
  726. log("before: pid={}s pgid={}s".format(os.getpid(), os.getpgid(0)))
  727. try:
  728. os.setpgid(0, 0)
  729. except OSError as e:
  730. log('Unable to set pgid. Possibly a grid job? Hopefully there will be no dangling processes when killed: {}'.format(
  731. repr(e)))
  732. log("after: pid={}s pgid={}s".format(os.getpid(), os.getpgid(0)))
  733. hb = start_heartbeat()
  734. log('alive? {} pid={} pgid={}'.format(hb.is_alive(), os.getpid(), os.getpgid(0)))
  735. rc = os.system(cmd)
  736. # Do not delete the heartbeat here. The discoverer of the sentinel will do that,
  737. # to avoid a race condition.
  738. #if os.path.exists(heartbeat_fn):
  739. # os.remove(heartbeat_fn)
  740. with open(sentinel_fn, 'w') as ofs:
  741. ofs.write(str(rc))
  742. # sys.exit(rc) # No-one would see this anyway.
  743. if rc:
  744. raise Exception('{} <- {!r}'.format(rc, cmd))
  745. main()
  746. """
  747. if __name__ == "__main__":
  748. import pdb
  749. pdb.set_trace()
  750. main(*sys.argv) # pylint: disable=no-value-for-parameter