network_based.py 41 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038
  1. """Network-based process-watcher.
  2. This is meant to be part of a 2-process system. For now, let's call these
  3. processes the Definer and the Watcher.
  4. * The Definer creates a graph of tasks and starts a resolver loop, like
  5. pypeflow. It keeps a Waiting list, a Running list, and a Done list. It then
  6. communicates with the Watcher.
  7. * The Watcher has 3 basic functions in its API.
  8. 1. Spawn jobs.
  9. 2. Kill jobs.
  10. 3. Query jobs.
  11. 1. Spawning jobs
  12. The job definition includes the script, how to run it (locally, qsub, etc.),
  13. and maybe some details (unique-id, run-directory). The Watcher then:
  14. * wraps the script within something to update the heartbeat server
  15. periodically,
  16. * spawns each job (possibly as a background process locally),
  17. * and records info (including PID or qsub-name) in a persistent database.
  18. 2. Kill jobs.
  19. Since it has a persistent database, it can always kill any job, upon request.
  20. 3. Query jobs.
  21. Whenever requested, it can poll the server for all or any jobs, returning the
  22. subset of completed jobs.
  23. The Definer would call the Watcher to spawn tasks, and then periodically to poll
  24. them. Because these are both now single-threaded, the Watcher *could* be a
  25. function within the Definer, or a it could be blocking call to a separate
  26. process.
  27. Caching/timestamp-checking would be done in the Definer, flexibly specific to
  28. each Task.
  29. Eventually, the Watcher could be in a different programming language. Maybe
  30. perl. (In bash, a background heartbeat gets is own process group, so it can be
  31. hard to clean up.)
  32. """
  33. try:
  34. from shlex import quote
  35. except ImportError:
  36. from pipes import quote
  37. import socketserver
  38. import collections
  39. import contextlib
  40. import glob
  41. import json
  42. import logging
  43. import os
  44. import pprint
  45. import re
  46. import shutil
  47. import signal
  48. import socket
  49. import subprocess
  50. import sys
  51. import multiprocessing
  52. import time
  53. import traceback
  54. log = logging.getLogger(__name__)
  55. HEARTBEAT_RATE_S = 600
  56. STATE_FN = 'state.py'
  57. Job = collections.namedtuple('Job', ['jobid', 'cmd', 'rundir', 'options'])
  58. MetaJob = collections.namedtuple('MetaJob', ['job', 'lang_exe'])
  59. lang_python_exe = sys.executable
  60. lang_bash_exe = '/bin/bash'
  61. @contextlib.contextmanager
  62. def cd(newdir):
  63. prevdir = os.getcwd()
  64. log.debug('CD: %r <- %r' %(newdir, prevdir))
  65. os.chdir(os.path.expanduser(newdir))
  66. try:
  67. yield
  68. finally:
  69. log.debug('CD: %r -> %r' %(newdir, prevdir))
  70. os.chdir(prevdir)
  71. # send message delimited with a \0
  72. def socket_send(socket, message):
  73. socket.sendall(b'{}\0'.format(message))
  74. # receive all of \0 delimited message
  75. # may discard content past \0, if any, so not safe to call twice on same socket
  76. def socket_read(socket):
  77. buffer = bytearray(b' ' * 256)
  78. nbytes = socket.recv_into(buffer, 256)
  79. if nbytes == 0: # empty message
  80. return
  81. message = ''
  82. while nbytes != 0:
  83. try: # index() raises when it can't find the character
  84. i = buffer[:nbytes].index('\0')
  85. message += str(buffer[:i]) # discard past delimiter
  86. break
  87. except ValueError: # haven't reached end yet
  88. message += str(buffer)
  89. nbytes = socket.recv_into(buffer, 256)
  90. return message
  91. # TODO: have state be persistent in some fashion, so we can be killed
  92. # and restarted
  93. class StatusServer(socketserver.BaseRequestHandler):
  94. """input format is "command [jobid [arg1 [arg2]]]"
  95. job status update commands are:
  96. i <jobid> <pid> <pgid> - initialize job
  97. d <jobid> - delete job info
  98. e <jobid> <exit_status> - job exit
  99. h <jobid> - heartbeat
  100. s <jobid> <stdout/stderr output> - record script output
  101. job status query commands are:
  102. A - returns authentication key
  103. D - returns the entire server state
  104. L - returns a space-delimited list of all jobids
  105. P <jobid> - returns "pid pgid" if available
  106. Q <jobid> - for done/exited jobs returns "EXIT <exit_status>"
  107. - for others, returns "RUNNING <seconds_since_last_update>"
  108. """
  109. def handle(self):
  110. message = socket_read(self.request)
  111. if not message:
  112. return
  113. args = message.split(None, 4)
  114. if not args: # make sure we have some arguments
  115. return
  116. command = args[0]
  117. if command == 'A': # send authentication key
  118. # need repr() as it's a byte string and may have zeroes
  119. socket_send(self.request, repr(multiprocessing.Process().authkey))
  120. return
  121. if command == 'D': # send server state
  122. socket_send(self.request, repr(self.server.server_job_list))
  123. return
  124. if command == 'L': # send list of jobids
  125. socket_send(self.request, ' '.join(self.server.server_job_list.keys()))
  126. return
  127. if len(args) < 2: # make sure we have a jobid argument
  128. return
  129. jobid = args[1]
  130. if command == 'i': # job start (with pid and pgid)
  131. if len(args) < 4: # missing pid/pgid
  132. self.server.server_job_list[jobid] = [time.time(), None, None, None]
  133. else:
  134. self.server.server_job_list[jobid] = [time.time(), args[2], args[3], None]
  135. with open(os.path.join(self.server.server_pid_dir, jobid), 'w') as f:
  136. f.write('{} {}'.format(args[2], args[3]))
  137. elif command == 'h' or command == 'e' or command == 's': # updates
  138. log_file = os.path.join(self.server.server_log_dir, jobid)
  139. if jobid in self.server.server_job_list:
  140. j = self.server.server_job_list[jobid]
  141. j[0] = time.time()
  142. else: # unknown jobid, so create job
  143. j = self.server.server_job_list[jobid] = [time.time(), None, None, None]
  144. with open(log_file, 'w'):
  145. pass
  146. os.utime(log_file, (j[0], j[0]))
  147. if command == 'e': # exit (with exit code)
  148. if len(args) < 3: # lack of error code is an error (bug)
  149. j[3] = '99'
  150. else:
  151. j[3] = args[2]
  152. with open(os.path.join(self.server.server_exitrc_dir, jobid), 'w') as f:
  153. f.write(str(j[3]))
  154. elif command == 's': # record log output
  155. if len(args) > 2:
  156. with open(log_file, 'a') as f:
  157. f.writelines(' '.join(args[2:]))
  158. elif jobid not in self.server.server_job_list: # unknown jobid
  159. return
  160. elif command == 'd': # delete job info
  161. del self.server.server_job_list[jobid]
  162. elif command == 'Q' or command == 'P': # queries
  163. j = self.server.server_job_list[jobid]
  164. if command == 'Q': # query job status
  165. if j[3]: # has exit status
  166. socket_send(self.request, 'EXIT {}'.format(j[3]))
  167. else: # return time since last heartbeat
  168. # convert to int to keep message shorter -
  169. # we don't need the extra precision, anyway
  170. diff = int(time.time() - j[0])
  171. if diff < 0: # possible with a clock change
  172. diff = 0
  173. socket_send(self.request, 'RUNNING {}'.format(diff))
  174. elif command == 'P': # get job pid and pgid
  175. if j[1] != None and j[2] != None:
  176. socket_send(self.request, '{} {}'.format(j[1], j[2]))
  177. # get local ip address while avoiding some common problems
  178. def get_localhost_ipaddress(hostname, port):
  179. if hostname == '0.0.0.0':
  180. # use getaddrinfo to work with ipv6
  181. list = socket.getaddrinfo(socket.gethostname(), port, socket.AF_INET, socket.SOCK_STREAM)
  182. for a in list:
  183. # TODO: have this part work with ipv6 addresses
  184. if len(a[4]) == 2 and a[4][0] != '127.0.0.1':
  185. return a[4][0]
  186. return hostname
  187. # if we end up restarting a partially killed process, we'll try
  188. # to pick up the ongoing heartbeats
  189. class ReuseAddrServer(socketserver.TCPServer):
  190. def restore_from_directories(self):
  191. """
  192. as our heartbeat server has been down, there's no accurate way
  193. to set the heartbeat times, so we just use the current time, and
  194. hope for an update from the process before the heartbeat timeout
  195. """
  196. if os.path.isdir(self.server_pid_dir):
  197. for file in os.listdir(self.server_pid_dir):
  198. with open(os.path.join(self.server_pid_dir, file)) as f:
  199. (pid, pgid) = f.readline().strip().split()
  200. if pid and pgid:
  201. self.server_job_list[file] = [time.time(), pid, pgid, None]
  202. # for finished proceses, though, we use the exit file time
  203. if os.path.isdir(self.server_exitrc_dir):
  204. for file in os.listdir(self.server_exitrc_dir):
  205. fn = os.path.join(self.server_exitrc_dir, file)
  206. with open(fn) as f:
  207. rc = f.readline().strip()
  208. if rc:
  209. if file in self.server_job_list:
  210. self.server_job_list[file][0] = os.path.getmtime(fn)
  211. self.server_job_list[file][3] = rc
  212. else:
  213. self.server_job_list[file] = [os.path.getmtime(fn), None, None, rc]
  214. def __init__(self, server_address, RequestHandlerClass, server_directories):
  215. self.allow_reuse_address = True
  216. self.server_log_dir, self.server_pid_dir, self.server_exitrc_dir = server_directories
  217. # {jobid} = [pid, pgid, heartbeat timestamp, exit rc]
  218. self.server_job_list = dict()
  219. self.restore_from_directories()
  220. socketserver.TCPServer.__init__(self, server_address, RequestHandlerClass)
  221. def start_server(server_directories, hostname='', port=0):
  222. server = ReuseAddrServer((hostname, port), StatusServer, server_directories)
  223. hostname, port = server.socket.getsockname()
  224. hostname = get_localhost_ipaddress(hostname, port)
  225. # we call it a thread, but use a process to avoid the gil
  226. hb_thread = multiprocessing.Process(target=server.serve_forever)
  227. # set daemon to make sure server shuts down when main program finishes
  228. hb_thread.daemon = True
  229. hb_thread.start()
  230. log.debug('server ({}, {}) alive? {}'.format(hostname, port, hb_thread.is_alive()))
  231. return (hb_thread.authkey, (hostname, port))
  232. class MetaJobClass(object):
  233. ext = {
  234. lang_python_exe: '.py',
  235. lang_bash_exe: '.bash',
  236. }
  237. def get_wrapper(self):
  238. return 'run-%s%s' %(self.mj.job.jobid, self.ext[self.mj.lang_exe])
  239. def get_pid(self):
  240. return self.mj.pid
  241. def kill(self, pid, sig):
  242. stored_pid = self.get_pid()
  243. if not pid:
  244. pid = stored_pid
  245. log.info('Not passed a pid to kill. Using stored pid:%s' %pid)
  246. if pid and stored_pid:
  247. if pid != stored_pid:
  248. log.error('pid:%s != stored_pid:%s' %(pid, stored_pid))
  249. os.kill(pid, sig)
  250. def __init__(self, mj):
  251. self.mj = mj
  252. class State(object):
  253. def get_state_fn(self):
  254. return os.path.join(self.__directory, STATE_FN)
  255. def get_heartbeat_server(self):
  256. return self.top['server']
  257. def get_directory_wrappers(self):
  258. return os.path.join(self.__directory, 'wrappers')
  259. def get_directory_exits(self): # more like, emergency exits ;)
  260. return 'exits'
  261. def get_server_directories(self):
  262. return (os.path.join(self.__directory, 'log'), os.path.join(self.__directory, 'pid'), os.path.join(self.__directory, 'exit'))
  263. def submit_background(self, bjob):
  264. """Run job in background.
  265. Record in state.
  266. """
  267. jobid = bjob.mjob.job.jobid
  268. self.top['jobs'][jobid] = bjob
  269. mji = MetaJobClass(bjob.mjob)
  270. script_fn = os.path.join(self.get_directory_wrappers(), mji.get_wrapper())
  271. exe = bjob.mjob.lang_exe
  272. with cd(bjob.mjob.job.rundir):
  273. bjob.submit(self, exe, script_fn) # Can raise
  274. log.info('Submitted backgroundjob=%s'%repr(bjob))
  275. self.top['jobids_submitted'].append(jobid)
  276. self.__changed = True
  277. def get_mji(self, jobid):
  278. mjob = self.top['jobs'][jobid].mjob
  279. return MetaJobClass(mjob)
  280. def get_bjob(self, jobid):
  281. return self.top['jobs'][jobid]
  282. def get_bjobs(self):
  283. return self.top['jobs']
  284. def get_mjobs(self):
  285. return {jobid: bjob.mjob for jobid, bjob in self.top['jobs'].items()}
  286. def add_deleted_jobid(self, jobid):
  287. self.top['jobids_deleted'].append(jobid)
  288. self.__changed = True
  289. def restart_server(self):
  290. hsocket = socket.socket()
  291. try: # see if our old server is still there
  292. hsocket.settimeout(60)
  293. hsocket.connect(self.get_heartbeat_server())
  294. socket_send(hsocket, 'A')
  295. line = socket_read(hsocket)
  296. hsocket.close()
  297. # auth is binary, so we have to eval to restore it
  298. if self.top['auth'] != eval(line): # not our server
  299. raise IOError()
  300. except IOError: # need to restart server
  301. try: # first we try restarting at same location
  302. old_hostname, old_port = self.get_heartbeat_server()
  303. self.top['auth'], self.top['server'] = start_server(self.get_server_directories(), old_hostname, old_port)
  304. except Exception:
  305. log.exception('Failed to restore previous watcher server settings')
  306. try: # next we try restarting with original arguments
  307. old_hostname, old_port = self.top['server_args']
  308. self.top['auth'], self.top['server'] = start_server(self.get_server_directories(), old_hostname, old_port)
  309. except Exception:
  310. self.top['auth'], self.top['server'] = start_server(self.get_server_directories())
  311. self.__changed = True
  312. # if we restarted, orphaned jobs might have left exit files
  313. # update the server with exit info
  314. def cleanup_exits(self):
  315. if os.path.exists(self.get_directory_exits()):
  316. for file in os.listdir(self.get_directory_exits()):
  317. exit_fn = os.path.join(self.get_directory_exits(), file)
  318. with open(exit_fn) as f:
  319. rc = f.readline().strip()
  320. hsocket = socket.socket()
  321. hsocket.connect(self.get_heartbeat_server())
  322. #socket_send(hsocket, 'e {} {}'.format(jobid, rc)) #TODO: Must get jobid from somewhere
  323. hsocket.close()
  324. os.remove(exit_fn)
  325. else:
  326. makedirs(self.get_directory_exits())
  327. def restore_from_save(self, state_fn):
  328. with open(state_fn) as f:
  329. self.top = eval(f.read())
  330. self.restart_server()
  331. self.cleanup_exits()
  332. self.__initialized = True
  333. def initialize(self, directory, hostname='', port=0):
  334. self.__directory = os.path.abspath(directory)
  335. state_fn = self.get_state_fn()
  336. if os.path.exists(state_fn):
  337. try: # try to restore state from last save
  338. if self.restore_from_save(state_fn):
  339. return
  340. except Exception:
  341. log.exception('Failed to use saved state "%s". Ignoring (and soon over-writing) current state.'%state_fn)
  342. makedirs(self.get_directory_wrappers())
  343. makedirs(self.get_directory_exits())
  344. for i in self.get_server_directories():
  345. makedirs(i)
  346. if directory != 'mypwatcher': # link mypwatcher to dir
  347. if os.path.exists('mypwatcher'):
  348. try:
  349. if os.path.islink('mypwatcher'):
  350. os.remove('mypwatcher')
  351. else:
  352. shutil.rmtree('mypwatcher')
  353. except OSError:
  354. pass
  355. os.symlink(self.__directory, 'mypwatcher')
  356. self.top['server_args'] = (hostname, port)
  357. self.top['auth'], self.top['server'] = start_server(self.get_server_directories(), hostname, port)
  358. self.__initialized = True
  359. self.__changed = True
  360. def is_initialized(self):
  361. return self.__initialized
  362. def save(self):
  363. # TODO: RW Locks, maybe for runtime of whole program.
  364. # only save if there has been a change
  365. if self.__changed:
  366. content = pprint.pformat(self.top)
  367. fn = self.get_state_fn()
  368. with open(fn + '.tmp', 'w') as f:
  369. f.write(content)
  370. # as atomic as we can portably manage
  371. shutil.move(fn + '.tmp', fn)
  372. self.__changed = False
  373. log.debug('saved state to %s' %repr(os.path.abspath(fn)))
  374. def __init__(self):
  375. self.__initialized = False
  376. self.__changed = False
  377. self.top = dict()
  378. self.top['jobs'] = dict()
  379. self.top['jobids_deleted'] = list()
  380. self.top['jobids_submitted'] = list()
  381. watcher_state = State()
  382. def get_state(directory, hostname='', port=0):
  383. if not watcher_state.is_initialized():
  384. watcher_state.initialize(directory, hostname, port)
  385. return watcher_state
  386. def Job_get_MetaJob(job, lang_exe=lang_bash_exe):
  387. return MetaJob(job, lang_exe=lang_exe)
  388. def MetaJob_wrap(mjob, state):
  389. """Write wrapper contents to mjob.wrapper.
  390. """
  391. wdir = state.get_directory_wrappers()
  392. # the wrapped job may chdir(), so use abspath
  393. edir = os.path.abspath(state.get_directory_exits())
  394. metajob_rundir = mjob.job.rundir
  395. bash_template = """#!%(lang_exe)s
  396. %(cmd)s
  397. """
  398. # We do not bother with 'set -e' here because this script is run either
  399. # in the background or via qsub.
  400. templates = {
  401. lang_python_exe: python_template,
  402. lang_bash_exe: bash_template,
  403. }
  404. mji = MetaJobClass(mjob)
  405. wrapper_fn = os.path.join(wdir, mji.get_wrapper())
  406. heartbeat_server, heartbeat_port = state.get_heartbeat_server()
  407. rate = HEARTBEAT_RATE_S
  408. command = mjob.job.cmd
  409. jobid = mjob.job.jobid
  410. exit_sentinel_fn = os.path.join(edir, jobid)
  411. prog = 'python3 -m pwatcher.mains.network_heartbeat'
  412. heartbeat_wrapper_template = "{prog} --directory={metajob_rundir} --heartbeat-server={heartbeat_server} --heartbeat-port={heartbeat_port} --exit-dir={edir} --rate={rate} --jobid={jobid} {command} || echo 99 >| {exit_sentinel_fn}"
  413. # We write 99 into exit-sentinel if the wrapper fails.
  414. wrapped = heartbeat_wrapper_template.format(**locals())
  415. log.debug('Wrapped "%s"' %wrapped)
  416. wrapped = templates[mjob.lang_exe] %dict(
  417. lang_exe=mjob.lang_exe,
  418. cmd=wrapped,
  419. )
  420. log.debug('Writing wrapper "%s"' %wrapper_fn)
  421. with open(wrapper_fn, 'w') as f:
  422. f.write(wrapped)
  423. def background(script, exe='/bin/bash'):
  424. """Start script in background (so it keeps going when we exit).
  425. Run in cwd.
  426. For now, stdout/stderr are captured.
  427. Return pid.
  428. """
  429. args = [exe, script]
  430. sin = open(os.devnull)
  431. sout = open(os.devnull, 'w')
  432. serr = open(os.devnull, 'w')
  433. pseudo_call = '{exe} {script} 1>|stdout 2>|stderr & '.format(exe=exe, script=script)
  434. log.debug('dir: {!r}\ncall: {!r}'.format(os.getcwd(), pseudo_call))
  435. proc = subprocess.Popen([exe, script], stdin=sin, stdout=sout, stderr=serr)
  436. pid = proc.pid
  437. log.debug('pid=%s pgid=%s sub-pid=%s' %(os.getpid(), os.getpgid(0), proc.pid))
  438. #checkcall = 'ls -l /proc/{}/cwd'.format(
  439. # proc.pid)
  440. #system(checkcall, checked=True)
  441. return pid
  442. def qstripped(option):
  443. """Given a string of options, remove any -q foo.
  444. >>> qstripped('-xy -q foo -z bar')
  445. '-xy -z bar'
  446. """
  447. # For now, do not strip -qfoo
  448. vals = option.strip().split()
  449. while '-q' in vals:
  450. i = vals.index('-q')
  451. vals = vals[0:i] + vals[i+2:]
  452. return ' '.join(vals)
  453. class MetaJobLocal(object):
  454. def submit(self, state, exe, script_fn):
  455. """Can raise.
  456. """
  457. #sge_option = self.mjob.job.options['sge_option']
  458. #assert sge_option is None, sge_option # might be set anyway
  459. pid = background(script_fn, exe=self.mjob.lang_exe)
  460. def kill(self, state):
  461. """Can raise.
  462. """
  463. hsocket = socket.socket()
  464. try:
  465. hsocket.connect(state.get_heartbeat_server())
  466. socket_send(hsocket, 'P {}'.format(self.mjob.job.jobid))
  467. line = socket_read(hsocket)
  468. hsocket.close()
  469. except IOError as e:
  470. log.exception('Failed to get pig/pgid for {}: {!r}'.format(self.mjob.job.jobid, e))
  471. return
  472. args = line.split(None, 2)
  473. pid = int(args[0])
  474. pgid = int(args[1])
  475. sig = signal.SIGKILL
  476. log.info('Sending signal(%s) to pgid=-%s (pid=%s) based on heartbeat server' %(sig, pgid, pid))
  477. try:
  478. os.kill(-pgid, sig)
  479. except Exception:
  480. log.exception('Failed to kill(%s) pgid=-%s for %r. Trying pid=%s' %(sig, pgid, self.mjob.job.jobid, pid))
  481. os.kill(pid, sig)
  482. def __repr__(self):
  483. return 'MetaJobLocal(%s)' %repr(self.mjob)
  484. def __init__(self, mjob):
  485. self.mjob = mjob # PUBLIC
  486. class MetaJobSge(object):
  487. def submit(self, state, exe, script_fn):
  488. """Can raise.
  489. """
  490. specific = self.specific
  491. #cwd = os.getcwd()
  492. job_name = self.get_jobname()
  493. sge_option = qstripped(self.mjob.job.options['sge_option'])
  494. job_queue = self.mjob.job.options['job_queue']
  495. # Add shebang, in case shell_start_mode=unix_behavior.
  496. # https://github.com/PacificBiosciences/FALCON/pull/348
  497. with open(script_fn, 'r') as original: data = original.read()
  498. with open(script_fn, 'w') as modified: modified.write("#!/bin/bash" + "\n" + data)
  499. sge_cmd = 'qsub -N {job_name} -q {job_queue} {sge_option} {specific} -cwd -o stdout -e stderr -S {exe} {script_fn}'.format(
  500. **locals())
  501. system(sge_cmd, checked=True) # TODO: Capture q-jobid
  502. def kill(self, state, heartbeat):
  503. """Can raise.
  504. """
  505. job_name = self.get_jobname()
  506. sge_cmd = 'qdel {}'.format(
  507. job_name)
  508. system(sge_cmd, checked=False)
  509. def get_jobname(self):
  510. """Some systems are limited to 15 characters, so for now we simply truncate the jobid.
  511. TODO: Choose a sequential jobname and record it. Priority: low, since collisions are very unlikely.
  512. """
  513. return self.mjob.job.jobid[:15]
  514. def __repr__(self):
  515. return 'MetaJobSge(%s)' %repr(self.mjob)
  516. def __init__(self, mjob):
  517. self.mjob = mjob
  518. self.specific = '-V' # pass enV; '-j y' => combine out/err
  519. class MetaJobPbs(object):
  520. """
  521. usage: qsub [-a date_time] [-A account_string] [-c interval]
  522. [-C directive_prefix] [-e path] [-h ] [-I [-X]] [-j oe|eo] [-J X-Y[:Z]]
  523. [-k o|e|oe] [-l resource_list] [-m mail_options] [-M user_list]
  524. [-N jobname] [-o path] [-p priority] [-q queue] [-r y|n]
  525. [-S path] [-u user_list] [-W otherattributes=value...]
  526. [-v variable_list] [-V ] [-z] [script | -- command [arg1 ...]]
  527. """
  528. def submit(self, state, exe, script_fn):
  529. """Can raise.
  530. """
  531. specific = self.specific
  532. #cwd = os.getcwd()
  533. job_name = self.get_jobname()
  534. sge_option = qstripped(self.mjob.job.options['sge_option'])
  535. job_queue = self.mjob.job.options['job_queue']
  536. # Add shebang, in case shell_start_mode=unix_behavior.
  537. # https://github.com/PacificBiosciences/FALCON/pull/348
  538. with open(script_fn, 'r') as original: data = original.read()
  539. with open(script_fn, 'w') as modified: modified.write("#!/bin/bash" + "\n" + data)
  540. sge_cmd = 'qsub -N {job_name} -q {job_queue} {sge_option} {specific} -o ev/null -e /dev/null -S {exe} {script_fn}'.format(
  541. **locals())
  542. system(sge_cmd, checked=True) # TODO: Capture q-jobid
  543. def kill(self, state, heartbeat):
  544. """Can raise.
  545. """
  546. job_name = self.get_jobname()
  547. sge_cmd = 'qdel {}'.format(
  548. job_name)
  549. system(sge_cmd, checked=False)
  550. def get_jobname(self):
  551. """Some systems are limited to 15 characters, so for now we simply truncate the jobid.
  552. TODO: Choose a sequential jobname and record it. Priority: low, since collisions are very unlikely.
  553. """
  554. return self.mjob.job.jobid[:15]
  555. def __repr__(self):
  556. return 'MetaJobPbs(%s)' %repr(self.mjob)
  557. def __init__(self, mjob):
  558. self.mjob = mjob
  559. self.specific = '-V' # pass enV; '-j y' => combine out/err
  560. class MetaJobTorque(object):
  561. # http://docs.adaptivecomputing.com/torque/4-0-2/help.htm#topics/commands/qsub.htm
  562. def submit(self, state, exe, script_fn):
  563. """Can raise.
  564. """
  565. specific = self.specific
  566. #cwd = os.getcwd()
  567. job_name = self.get_jobname()
  568. sge_option = qstripped(self.mjob.job.options['sge_option'])
  569. job_queue = self.mjob.job.options['job_queue']
  570. cwd = os.getcwd()
  571. # Add shebang, in case shell_start_mode=unix_behavior.
  572. # https://github.com/PacificBiosciences/FALCON/pull/348
  573. with open(script_fn, 'r') as original: data = original.read()
  574. with open(script_fn, 'w') as modified: modified.write("#!/bin/bash" + "\n" + data)
  575. sge_cmd = 'qsub -N {job_name} -q {job_queue} {sge_option} {specific} -d {cwd} -o stdout -e stderr -S {exe} {script_fn}'.format(
  576. **locals())
  577. system(sge_cmd, checked=True) # TODO: Capture q-jobid
  578. def kill(self, state, heartbeat):
  579. """Can raise.
  580. """
  581. job_name = self.get_jobname()
  582. sge_cmd = 'qdel {}'.format(
  583. job_name)
  584. system(sge_cmd, checked=False)
  585. def get_jobname(self):
  586. """Some systems are limited to 15 characters, so for now we simply truncate the jobid.
  587. TODO: Choose a sequential jobname and record it. Priority: low, since collisions are very unlikely.
  588. """
  589. return self.mjob.job.jobid[:15]
  590. def __repr__(self):
  591. return 'MetaJobTorque(%s)' %repr(self.mjob)
  592. def __init__(self, mjob):
  593. super(MetaJobTorque, self).__init__(mjob)
  594. self.specific = '-V' # pass enV; '-j oe' => combine out/err
  595. self.mjob = mjob
  596. class MetaJobSlurm(object):
  597. def submit(self, state, exe, script_fn):
  598. """Can raise.
  599. """
  600. job_name = self.get_jobname()
  601. sge_option = qstripped(self.mjob.job.options['sge_option'])
  602. job_queue = self.mjob.job.options['job_queue']
  603. cwd = os.getcwd()
  604. sge_cmd = 'sbatch -J {job_name} -p {job_queue} {sge_option} -D {cwd} -o stdout -e stderr --wrap="{exe} {script_fn}"'.format(
  605. **locals())
  606. # "By default all environment variables are propagated."
  607. # http://slurm.schedmd.com/sbatch.html
  608. system(sge_cmd, checked=True) # TODO: Capture sbatch-jobid
  609. def kill(self, state, heartbeat):
  610. """Can raise.
  611. """
  612. job_name = self.get_jobname()
  613. sge_cmd = 'scancel -n {}'.format(
  614. job_name)
  615. system(sge_cmd, checked=False)
  616. def get_jobname(self):
  617. """Some systems are limited to 15 characters, so for now we simply truncate the jobid.
  618. TODO: Choose a sequential jobname and record it. Priority: low, since collisions are very unlikely.
  619. """
  620. return self.mjob.job.jobid[:15]
  621. def __repr__(self):
  622. return 'MetaJobSlurm(%s)' %repr(self.mjob)
  623. def __init__(self, mjob):
  624. self.mjob = mjob
  625. class MetaJobLsf(object):
  626. def submit(self, state, exe, script_fn):
  627. """Can raise.
  628. """
  629. job_name = self.get_jobname()
  630. sge_option = qstripped(self.mjob.job.options['sge_option'])
  631. job_queue = self.mjob.job.options['job_queue']
  632. sge_cmd = 'bsub -J {job_name} -q {job_queue} {sge_option} -o stdout -e stderr "{exe} {script_fn}"'.format(
  633. **locals())
  634. # "Sets the user's execution environment for the job, including the current working directory, file creation mask, and all environment variables, and sets LSF environment variables before starting the job."
  635. system(sge_cmd, checked=True) # TODO: Capture q-jobid
  636. def kill(self, state, heartbeat):
  637. """Can raise.
  638. """
  639. job_name = self.get_jobname()
  640. sge_cmd = 'bkill -J {}'.format(
  641. job_name)
  642. system(sge_cmd, checked=False)
  643. def get_jobname(self):
  644. """Some systems are limited to 15 characters, so for now we simply truncate the jobid.
  645. TODO: Choose a sequential jobname and record it. Priority: low, since collisions are very unlikely.
  646. """
  647. return self.mjob.job.jobid[:15]
  648. def __repr__(self):
  649. return 'MetaJobLsf(%s)' %repr(self.mjob)
  650. def __init__(self, mjob):
  651. self.mjob = mjob
  652. def cmd_run(state, jobids, job_type, job_queue):
  653. """On stdin, each line is a unique job-id, followed by run-dir, followed by command+args.
  654. Wrap them and run them locally, in the background.
  655. """
  656. jobs = dict()
  657. submitted = list()
  658. result = {'submitted': submitted}
  659. for jobid, desc in jobids.items():
  660. assert 'cmd' in desc
  661. options = {}
  662. if 'job_queue' not in desc:
  663. raise Exception(pprint.pformat(desc))
  664. for k in ('sge_option', 'job_type', 'job_queue'): # extras to be stored
  665. if k in desc:
  666. if desc[k]:
  667. options[k] = desc[k]
  668. if options.get('sge_option', None) is None:
  669. # This way we can always safely include it.
  670. options['sge_option'] = ''
  671. if not options.get('job_queue'):
  672. options['job_queue'] = job_queue
  673. if not options.get('job_type'):
  674. options['job_type'] = job_type
  675. jobs[jobid] = Job(jobid, desc['cmd'], desc['rundir'], options)
  676. log.debug('jobs:\n%s' %pprint.pformat(jobs))
  677. for jobid, job in jobs.items():
  678. desc = jobids[jobid]
  679. log.info('starting job %s' %pprint.pformat(job))
  680. mjob = Job_get_MetaJob(job)
  681. MetaJob_wrap(mjob, state)
  682. options = job.options
  683. my_job_type = desc.get('job_type')
  684. if my_job_type is None:
  685. my_job_type = job_type
  686. my_job_type = my_job_type.upper()
  687. if my_job_type != 'LOCAL':
  688. if ' ' in job_queue:
  689. msg = 'For pwatcher=network_based, job_queue cannot contain spaces:\n job_queue={!r}\n job_type={!r}'.format(
  690. job_queue, my_job_type)
  691. raise Exception(msg)
  692. if my_job_type == 'LOCAL':
  693. bjob = MetaJobLocal(mjob)
  694. elif my_job_type == 'SGE':
  695. bjob = MetaJobSge(mjob)
  696. elif my_job_type == 'PBS':
  697. bjob = MetaJobPbs(mjob)
  698. elif my_job_type == 'TORQUE':
  699. bjob = MetaJobTorque(mjob)
  700. elif my_job_type == 'SLURM':
  701. bjob = MetaJobSlurm(mjob)
  702. elif my_job_type == 'LSF':
  703. bjob = MetaJobLsf(mjob)
  704. else:
  705. raise Exception('Unknown my_job_type=%s' %repr(my_job_type))
  706. try:
  707. state.submit_background(bjob)
  708. submitted.append(jobid)
  709. except Exception:
  710. log.exception('In pwatcher.network_based.cmd_run(), failed to submit background-job:\n{!r}'.format(
  711. bjob))
  712. return result
  713. # The caller is responsible for deciding what to do about job-submission failures. Re-try, maybe?
  714. def system(call, checked=False):
  715. log.info('!{}'.format(call))
  716. rc = os.system(call)
  717. if checked and rc:
  718. raise Exception('{} <- {!r}'.format(rc, call))
  719. _warned = dict()
  720. def warnonce(hashkey, msg):
  721. if hashkey in _warned:
  722. return
  723. log.warning(msg)
  724. _warned[hashkey] = True
  725. def get_status(state, jobid):
  726. hsocket = socket.socket()
  727. try:
  728. hsocket.connect(state.get_heartbeat_server())
  729. socket_send(hsocket, 'Q {}'.format(jobid))
  730. line = socket_read(hsocket)
  731. hsocket.close()
  732. except IOError: # server can't be reached for comment
  733. return 'UNKNOWN'
  734. if not line: # bad formatting
  735. return 'UNKNOWN'
  736. args = line.split(None, 2) # status, arg
  737. if len(args) != 2:
  738. return 'UNKNOWN'
  739. if args[0] == 'EXIT':
  740. return line
  741. elif args[0] == 'RUNNING':
  742. if 3*HEARTBEAT_RATE_S < int(args[1]):
  743. msg = 'DEAD job? 3*{} < {} for {!r}'.format(
  744. HEARTBEAT_RATE_S, args[1], jobid)
  745. log.debug(msg)
  746. warnonce(jobid, msg)
  747. return 'DEAD'
  748. else:
  749. return 'RUNNING'
  750. return 'UNKNOWN'
  751. def cmd_query(state, which, jobids):
  752. """Return the state of named jobids.
  753. """
  754. result = dict()
  755. jobstats = dict()
  756. result['jobids'] = jobstats
  757. for jobid in find_jobids(state, which, jobids):
  758. status = get_status(state, jobid)
  759. log.debug('Status %s for jobid:%s' %(status, jobid))
  760. jobstats[jobid] = status
  761. return result
  762. def get_jobid2pid(pid2mjob):
  763. result = dict()
  764. for pid, mjob in pid2mjob.items():
  765. jobid = mjob.job.jobid
  766. result[jobid] = pid
  767. return result
  768. def find_jobids(state, which, jobids):
  769. """Yield jobids.
  770. If which=='list', then query jobs listed as jobids.
  771. If which=='known', then query all known jobs.
  772. If which=='infer', then query all jobs with heartbeats.
  773. These are not quite finished, but already useful.
  774. """
  775. #log.debug('find_jobids for which=%s, jobids=%s' %(which, pprint.pformat(jobids)))
  776. if which == 'infer':
  777. hsocket = socket.socket()
  778. try:
  779. hsocket.connect(state.get_heartbeat_server())
  780. socket_send(hsocket, 'L')
  781. line = socket_read(hsocket)
  782. hsocket.close()
  783. except IOError as e:
  784. log.exception('In find_jobids(), unable to infer jobid list: {!r}'.format(e))
  785. yield
  786. for jobid in line.split():
  787. yield jobid
  788. elif which == 'known':
  789. jobid2mjob = state.get_mjobs()
  790. for jobid, mjob in jobid2mjob.items():
  791. yield jobid
  792. elif which == 'list':
  793. #jobid2mjob = state.get_mjobs()
  794. #log.debug('jobid2mjob:\n%s' %pprint.pformat(jobid2mjob))
  795. for jobid in jobids:
  796. #log.debug('jobid=%s; jobids=%s' %(repr(jobid), repr(jobids)))
  797. #if jobid not in jobid2mjob:
  798. # log.info("jobid=%s is not known. Might have been deleted already." %jobid)
  799. yield jobid
  800. else:
  801. raise Exception('which=%s'%repr(which))
  802. def delete_jobid(state, jobid, keep=False):
  803. """
  804. Kill the job with this heartbeat.
  805. (If there is no heartbeat, then the job is already gone.)
  806. Delete the entry from state and update its jobid.
  807. """
  808. try:
  809. bjob = state.get_bjob(jobid)
  810. except Exception:
  811. log.exception('In delete_jobid(), unable to find batchjob for %s' %(jobid))
  812. # TODO: Maybe provide a default grid type, so we can attempt to delete anyway?
  813. return
  814. try:
  815. bjob.kill(state, jobid)
  816. except Exception as exc:
  817. log.debug('Failed to kill job for jobid {!r}: {!r}'.format(
  818. jobid , exc))
  819. state.add_deleted_jobid(jobid)
  820. # For now, keep it in the 'jobs' table.
  821. hsocket = socket.socket()
  822. try:
  823. hsocket.connect(state.get_heartbeat_server())
  824. socket_send(hsocket, 'd {}'.format(jobid))
  825. hsocket.close()
  826. log.debug('Removed jobid=%s' %repr(jobid))
  827. except IOError as e:
  828. log.debug('Cannot remove jobid {}: {!r}'.format(jobid, e))
  829. def cmd_delete(state, which, jobids):
  830. """Kill designated jobs, including (hopefully) their
  831. entire process groups.
  832. If which=='list', then kill all jobs listed as jobids.
  833. If which=='known', then kill all known jobs.
  834. If which=='infer', then kill all jobs with heartbeats.
  835. Remove those heartbeat files.
  836. """
  837. log.debug('Deleting jobs for jobids from %s (%s)' %(
  838. which, repr(jobids)))
  839. for jobid in find_jobids(state, which, jobids):
  840. delete_jobid(state, jobid)
  841. def makedirs(path):
  842. if not os.path.isdir(path):
  843. os.makedirs(path)
  844. def readjson(ifs):
  845. """Del keys that start with ~.
  846. That lets us have trailing commas on all other lines.
  847. """
  848. content = ifs.read()
  849. log.debug('content:%s' %repr(content))
  850. jsonval = json.loads(content)
  851. #pprint.pprint(jsonval)
  852. def striptildes(subd):
  853. if not isinstance(subd, dict):
  854. return
  855. for k,v in list(subd.items()):
  856. if k.startswith('~'):
  857. del subd[k]
  858. else:
  859. striptildes(v)
  860. striptildes(jsonval)
  861. #pprint.pprint(jsonval)
  862. return jsonval
  863. class ProcessWatcher(object):
  864. def run(self, jobids, job_type, job_defaults_dict):
  865. #import traceback; log.debug(''.join(traceback.format_stack()))
  866. log.debug('run(jobids={}, job_type={}, job_defaults_dict={})'.format(
  867. '<%s>'%len(jobids), job_type, job_defaults_dict))
  868. return cmd_run(self.state, jobids, job_type, job_defaults_dict)
  869. def query(self, which='list', jobids=[]):
  870. log.debug('query(which={!r}, jobids={})'.format(
  871. which, '<%s>'%len(jobids)))
  872. return cmd_query(self.state, which, jobids)
  873. def delete(self, which='list', jobids=[]):
  874. log.debug('delete(which={!r}, jobids={})'.format(
  875. which, '<%s>'%len(jobids)))
  876. return cmd_delete(self.state, which, jobids)
  877. def __init__(self, state):
  878. raise Exception('network_based pwatcher is currently broken.')
  879. self.state = state
  880. def get_process_watcher(directory):
  881. state = get_state(directory)
  882. #log.debug('state =\n%s' %pprint.pformat(state.top))
  883. return ProcessWatcher(state)
  884. #State_save(state)
  885. @contextlib.contextmanager
  886. def process_watcher(directory, hostname='', port=0):
  887. """This will (someday) hold a lock, so that
  888. the State can be written safely at the end.
  889. """
  890. state = get_state(directory, hostname, port)
  891. #log.debug('state =\n%s' %pprint.pformat(state.top))
  892. yield ProcessWatcher(state)
  893. # TODO: Sometimes, maybe we should not save state.
  894. # Or maybe we *should* on exception.
  895. state.save()
  896. def main(prog, cmd, state_dir='mainpwatcher', argsfile=None):
  897. logging.basicConfig()
  898. logging.getLogger().setLevel(logging.NOTSET)
  899. log.warning('logging basically configured')
  900. log.debug('debug mode on')
  901. assert cmd in ['run', 'query', 'delete']
  902. ifs = sys.stdin if not argsfile else open(argsfile)
  903. argsdict = readjson(ifs)
  904. log.info('argsdict =\n%s' %pprint.pformat(argsdict))
  905. with process_watcher(state_dir) as watcher:
  906. result = getattr(watcher, cmd)(**argsdict)
  907. if result is not None:
  908. print(pprint.pformat(result))
  909. # With bash, we would need to set the session, rather than
  910. # the process group. That's not ideal, but this is here for reference.
  911. # http://stackoverflow.com/questions/6549663/how-to-set-process-group-of-a-shell-script
  912. #
  913. bash_template = """#!%(lang_exe)s
  914. cmd='%(cmd)s'
  915. "$cmd"
  916. """
  917. # perl might be better, for efficiency.
  918. # But we will use python for now.
  919. #
  920. python_template = r"""#!%(lang_exe)s
  921. import threading, time, os, sys, subprocess, socket
  922. cmd='%(cmd)s'
  923. heartbeat_server=('%(heartbeat_server)s' %(heartbeat_port)s)
  924. jobid='%(jobid)s'
  925. sleep_s=%(sleep_s)s
  926. cwd='%(cwd)s'
  927. os.chdir(cwd)
  928. def socket_send(socket, message):
  929. socket.sendall(b'{}\0'.format(message))
  930. def log(msg):
  931. hsocket = socket.socket()
  932. try:
  933. hsocket.connect(heartbeat_server)
  934. socket_send(hsocket, 's {} {}\n'.format(jobid, msg))
  935. hsocket.close()
  936. except IOError: # better to miss a line than terminate
  937. pass
  938. def thread_heartbeat():
  939. pid = os.getpid()
  940. pgid = os.getpgid(0)
  941. hsocket = socket.socket()
  942. try:
  943. hsocket.connect(heartbeat_server)
  944. socket_send(hsocket, 'i {} {} {}'.format(jobid, pid, pgid))
  945. hsocket.close()
  946. except IOError: # hope it's a temporary error
  947. pass
  948. while True:
  949. time.sleep(sleep_s)
  950. hsocket = socket.socket()
  951. try:
  952. hsocket.connect(heartbeat_server)
  953. socket_send(hsocket, 'h {}'.format(jobid))
  954. hsocket.close()
  955. except IOError: # hope it's a temporary error
  956. pass
  957. def start_heartbeat():
  958. hb = threading.Thread(target=thread_heartbeat)
  959. log('alive? {}'.format(hb.is_alive()))
  960. hb.daemon = True
  961. hb.start()
  962. return hb
  963. def main():
  964. log('cwd:{!r}'.format(os.getcwd()))
  965. log("before: pid={}s pgid={}s".format(os.getpid(), os.getpgid(0)))
  966. try:
  967. os.setpgid(0, 0)
  968. except OSError as e:
  969. log('Unable to set pgid. Possibly a grid job? Hopefully there will be no dangling processes when killed: {}'.format(
  970. repr(e)))
  971. log("after: pid={}s pgid={}s".format(os.getpid(), os.getpgid(0)))
  972. hb = start_heartbeat()
  973. log('alive? {} pid={} pgid={}'.format(hb.is_alive(), os.getpid(), os.getpgid(0)))
  974. sp = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
  975. with sp.stdout as f:
  976. for line in iter(f.readline, b''):
  977. log(heartbeat_server, jobid, line)
  978. rc = sp.wait()
  979. # sys.exit(rc) # No-one would see this anyway.
  980. hsocket = socket.socket()
  981. try:
  982. hsocket.connect(heartbeat_server)
  983. socket_send(hsocket, 'e {} {}'.format(jobid, rc))
  984. hsocket.close()
  985. except IOError as e:
  986. log('could not update heartbeat server with exit status: {} {}: {!r}'.format(jobid, rc, e))
  987. if rc:
  988. raise Exception('{} <- {!r}'.format(rc, cmd))
  989. main()
  990. """
  991. if __name__ == "__main__":
  992. import pdb
  993. pdb.set_trace()
  994. main(*sys.argv) # pylint: disable=no-value-for-parameter