simple_pwatcher_bridge.py 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764
  1. from .util import (mkdirs, system, touch, run, cd)
  2. import pwatcher.blocking
  3. import pwatcher.fs_based
  4. import pwatcher.network_based
  5. import networkx
  6. import networkx.algorithms.dag #import (topological_sort, is_directed_acyclic_graph)
  7. import hashlib
  8. import json
  9. import logging
  10. import os
  11. import pprint
  12. import random
  13. import re
  14. import sys
  15. import tempfile
  16. import time
  17. import traceback
  18. LOG = logging.getLogger(__name__)
  19. def generate_jobid(node, script_fn):
  20. # For now, we keep it simple. Just the task.json.
  21. # We truncate the job_name to 15 chars for the sake of some job systems.
  22. script_content = open(script_fn, 'rb').read()
  23. checksum = hashlib.md5(script_content).hexdigest()
  24. return 'P' + checksum[0:14]
  25. def generate_jobid_alt_given_checksum(script_fn, checksum):
  26. """
  27. Note: Unable to run job: denied: "0_raw_fofn_abs_" is not a valid object name (cannot start with a digit).
  28. So we need a prefix.
  29. >>> generate_jobid_alt_given_checksum('4-quiver/000000F_002/run.sh', '123')
  30. 'P4_000000F_002_123'
  31. >>> generate_jobid_alt_given_checksum('4-quiver/quiver_scatter/000000F_002/run.sh', '456')
  32. 'P4_000000F_002_456'
  33. """
  34. # TODO: Consider intermediate directories.
  35. stage = get_stage_char(script_fn)
  36. basedirname = os.path.basename(os.path.dirname(script_fn))
  37. return alphanum('P' + stage + '_' + basedirname + '_' + checksum) # without length limit
  38. def generate_jobid_alt(node, script_fn):
  39. # dgordon suggests this as a preferable alternative.
  40. script_content = open(script_fn, 'rb').read()
  41. checksum = hashlib.md5(script_content).hexdigest()
  42. return generate_jobid_alt_given_checksum(script_fn, checksum)
  43. JOBID_GENERATORS = [generate_jobid, generate_jobid_alt]
  44. def alphanum(foo):
  45. """
  46. >>> alphanum('/foo-bar/')
  47. '_foo_bar_'
  48. """
  49. return foo.replace('/', '_').replace('-', '_')
  50. re_stage_char = re.compile(r'\b(\d)-\w+/')
  51. def get_stage_char(fn):
  52. """
  53. >>> get_stage_char('0-hi/x/y')
  54. '0'
  55. >>> get_stage_char('x/y')
  56. 'P'
  57. """
  58. mo = re_stage_char.search(fn)
  59. if mo:
  60. return mo.group(1)
  61. return 'P'
  62. def endrun(thisnode, status):
  63. """By convention for now, status is one of:
  64. 'DEAD'
  65. 'UNSUBMITTED' (a pseudo-status defined in the ready-loop of alive())
  66. 'EXIT rc'
  67. """
  68. name = status.split()[0]
  69. if name == 'DEAD':
  70. LOG.warning(''.join(traceback.format_stack()))
  71. LOG.error('Task {}\n is DEAD, meaning no HEARTBEAT, but this can be a race-condition. If it was not killed, then restarting might suffice. Otherwise, you might have excessive clock-skew.'.format(thisnode))
  72. #thisnode.setSatisfied(False) # I think we can leave it unsatisfied. Not sure what happens on retry though.
  73. elif name == 'UNSUBMITTED':
  74. LOG.warning(''.join(traceback.format_stack()))
  75. LOG.error('Task {}\n is UNSUBMITTED, meaning job-submission somehow failed. Possibly a re-start would work. Otherwise, you need to investigate.'.format(thisnode))
  76. thisnode.setSatisfied(False)
  77. elif name != 'EXIT':
  78. raise Exception('Unexpected status {!r}'.format(name))
  79. else:
  80. code = int(status.split()[1])
  81. if 0 != code:
  82. LOG.error('Task {} failed with exit-code={}'.format(thisnode, code))
  83. thisnode.setSatisfied(False)
  84. else:
  85. thisnode.setSatisfied(True)
  86. class PwatcherTaskQueue(object):
  87. def enque(self, nodes):
  88. """This should be an injected dependency.
  89. Yield any Nodes that could not be submitted.
  90. Nodes that lacked a script were actually run in-process and are considered
  91. successful unless they raised an Exception, so they go into the to_report set.
  92. """
  93. # Start all "nodes".
  94. # Note: It is safe to run this block always, but we save a
  95. # call to pwatcher with 'if ready'.
  96. LOG.debug('enque nodes:\n%s' %pprint.pformat(nodes))
  97. jobids = dict()
  98. for node in nodes:
  99. #node.satisfy() # This would do the job without a process-watcher.
  100. generated_script_fn = node.execute() # misnomer; this only dumps task.json now
  101. if not generated_script_fn:
  102. raise Exception('Missing generated_script_fn for Node {}'.format(node))
  103. ## This task is done.
  104. #endrun(node, 'EXIT 0')
  105. #self.__to_report.append(node)
  106. #continue
  107. # For now, consider it as "submitted" and finished.
  108. # (It would throw exception on error.)
  109. jobid = self.__generate_jobid(node, generated_script_fn)
  110. self.__known[jobid] = node
  111. rundir, basename = os.path.split(os.path.abspath(generated_script_fn))
  112. cmd = '/bin/bash {}'.format(basename)
  113. job_type = node.pypetask.parameters.get('job_type', None)
  114. dist = node.pypetask.dist # must always exist now
  115. job_dict = dict(self.__job_defaults_dict)
  116. job_dict.update(dist.job_dict)
  117. LOG.debug('In rundir={!r}, job_dict={!r}'.format(
  118. rundir, job_dict))
  119. #job_nproc = dist.NPROC # string or int
  120. #job_mb = dist.MB # string or int, megabytes
  121. job_local = int(dist.local) # bool->1/0, easily serialized
  122. jobids[jobid] = {
  123. 'cmd': cmd,
  124. 'rundir': rundir,
  125. # These are optional:
  126. 'job_type': job_type,
  127. 'job_dict': job_dict,
  128. #'job_nproc': job_nproc,
  129. #'job_mb': job_mb,
  130. 'job_local': job_local,
  131. }
  132. # Also send the default type and queue-name.
  133. watcher_args = {
  134. 'jobids': jobids,
  135. 'job_type': self.__job_type,
  136. 'job_defaults_dict': self.__job_defaults_dict,
  137. }
  138. result = self.watcher.run(**watcher_args)
  139. LOG.debug('Result of watcher.run()={}'.format(repr(result)))
  140. submitted = result['submitted']
  141. self.__running.update(submitted)
  142. for jobid in (set(jobids.keys()) - set(submitted)):
  143. yield self.__known[jobid] # TODO: What should be the status for these submission failures?
  144. def check_done(self):
  145. """Yield Nodes which have finished since the last check.
  146. """
  147. watcher_args = {
  148. 'jobids': list(self.__running),
  149. 'which': 'list',
  150. }
  151. q = self.watcher.query(**watcher_args)
  152. #LOG.debug('In check_done(), result of query:%s' %repr(q))
  153. for jobid, status in q['jobids'].items():
  154. if status.startswith('EXIT') or status.startswith('DEAD'):
  155. self.__running.remove(jobid)
  156. node = self.__known[jobid]
  157. self.__to_report.append(node)
  158. try:
  159. endrun(node, status)
  160. except Exception as e:
  161. msg = 'Failed to clean-up FakeThread: jobid={} status={}'.format(jobid, repr(status))
  162. LOG.exception(msg)
  163. raise
  164. to_report = list(self.__to_report)
  165. self.__to_report = list()
  166. return to_report
  167. def terminate(self):
  168. watcher_args = {
  169. 'jobids': list(self.__running),
  170. 'which': 'known',
  171. }
  172. q = self.watcher.delete(**watcher_args)
  173. LOG.debug('In notifyTerminate(), result of delete:%s' %repr(q))
  174. def __init__(self, watcher, section_job=None, job_type='local', job_defaults_dict=None, jobid_generator=0):
  175. self.watcher = watcher
  176. self.__section_job = section_job if section_job else {}
  177. self.__job_type = job_type
  178. self.__job_defaults_dict = job_defaults_dict
  179. self.__running = set() # jobids
  180. self.__known = dict() # jobid -> Node
  181. self.__to_report = list() # Nodes
  182. self.__generate_jobid = JOBID_GENERATORS[jobid_generator]
  183. def get_unsatisfied_subgraph(g):
  184. unsatg = networkx.DiGraph(g)
  185. for n in g.nodes():
  186. if n.satisfied():
  187. unsatg.remove_node(n)
  188. return unsatg
  189. def find_all_ancestors(g):
  190. ancestors = dict()
  191. def get_ancestors(node):
  192. """memoized"""
  193. if node not in ancestors:
  194. myancestors = set()
  195. for pred in g.predecessors_iter(node):
  196. myancestors.update(get_ancestors(pred))
  197. ancestors[node] = myancestors
  198. return ancestors[node]
  199. for node in g.nodes():
  200. get_ancestors(node)
  201. return ancestors
  202. def find_next_ready_and_remove(g, node):
  203. """Given a recently satisfied node,
  204. return any successors with in_degree 1.
  205. Then remove node from g immediately.
  206. """
  207. ready = set()
  208. for n in g.successors(node):
  209. if 1 == g.in_degree(n):
  210. ready.add(n)
  211. g.remove_node(node)
  212. return ready
  213. def find_all_roots(g):
  214. """Find all nodes in g which have no predecessors.
  215. """
  216. roots = set()
  217. for node in g:
  218. if 0 == g.in_degree(node):
  219. roots.add(node)
  220. return roots
  221. class Workflow(object):
  222. def _create_node(self, pypetask):
  223. """Given a PypeTask, return a Node for our Workflow graph.
  224. Recursively create more nodes based on 'pypetask.inputs',
  225. record them as 'node.needs', and update the global pypetask2node table.
  226. """
  227. needs = set()
  228. use_tmpdir = self.use_tmpdir
  229. pre_script = self.pre_script
  230. if pypetask.dist.use_tmpdir is not None:
  231. use_tmpdir = pypetask.dist.use_tmpdir
  232. node = PypeNode(pypetask.name, pypetask.wdir, pypetask, needs, use_tmpdir, pre_script)
  233. self.pypetask2node[pypetask] = node
  234. for key, plf in pypetask.inputs.items():
  235. if plf.producer is None:
  236. continue
  237. if plf.producer not in self.pypetask2node:
  238. self._create_node(plf.producer)
  239. needed_node = self.pypetask2node[plf.producer]
  240. needs.add(needed_node)
  241. LOG.debug('New {!r} needs {!r}'.format(node, needs))
  242. return node
  243. def addTask(self, pypetask):
  244. node = self._create_node(pypetask)
  245. sentinel_done_fn = node.sentinel_done_fn()
  246. if sentinel_done_fn in self.sentinels:
  247. msg = 'FOUND sentinel {!r} twice: {!r} ({!r}) and {!r}\nNote: Each task needs its own sentinel (and preferably its own run-dir).'.format(sentinel_done_fn, node, pypetask, self.sentinels[sentinel_done_fn])
  248. raise Exception(msg)
  249. self.sentinels[sentinel_done_fn] = node
  250. self.graph.add_node(node)
  251. for need in node.needs:
  252. #print "Need:", need, node
  253. self.graph.add_edge(need, node)
  254. def addTasks(self, tlist):
  255. for t in tlist:
  256. self.addTask(t)
  257. def refreshTargets(self, targets=None, updateFreq=10, exitOnFailure=True):
  258. try:
  259. self._refreshTargets(updateFreq, exitOnFailure)
  260. except:
  261. self.tq.terminate()
  262. raise
  263. def _refreshTargets(self, updateFreq, exitOnFailure):
  264. """Raise Exception (eventually) on any failure.
  265. - updateFreq (seconds) is really a max; we climb toward it gradually, and we reset when things change.
  266. - exitOnFailure=False would allow us to keep running (for a while) when parallel jobs fail.
  267. - self.max_jobs: the max number of concurrently running jobs
  268. - possibly this should be the number of cpus in use, but for now it is qsub jobs, for simplicity.
  269. """
  270. # Note: With the 'blocking' pwatcher, we will have 1 thread per live qsub job.
  271. # If/when that becomes a problem, please re-write the pwatcher as Go or Nim.
  272. # This loop is single-threaded. If we ignore max_jobs,
  273. # then we would send large queries for no reason, but that is not really a big deal.
  274. # The Python 'blocking' pwatcher is the real reason to limit jobs, for now.
  275. assert networkx.algorithms.dag.is_directed_acyclic_graph(self.graph)
  276. assert isinstance(self.max_jobs, int)
  277. failures = 0
  278. unsatg = get_unsatisfied_subgraph(self.graph)
  279. ready = find_all_roots(unsatg)
  280. submitted = set()
  281. init_sleep_time = 0.1
  282. sleep_time = init_sleep_time
  283. slept_seconds_since_last_ping = 0.0
  284. num_iterations_since_last_ping = 0
  285. num_iterations_since_last_ping_max = 1
  286. LOG.info('Num unsatisfied: {}, graph: {}'.format(len(unsatg), len(self.graph)))
  287. while ready or submitted:
  288. # Nodes cannot be in ready or submitted unless they are also in unsatg.
  289. to_submit = set()
  290. if self.max_jobs <= 0:
  291. msg = 'self.max_jobs={}'.format(self.max_jobs)
  292. raise Exception(msg)
  293. while ready and (self.max_jobs > len(submitted) + len(to_submit)):
  294. node = ready.pop()
  295. to_submit.add(node)
  296. LOG.info('About to submit: {!r}'.format(node))
  297. if to_submit:
  298. unsubmitted = set(self.tq.enque(to_submit)) # In theory, this traps exceptions.
  299. if unsubmitted:
  300. msg = 'Failed to enqueue {} of {} jobs: {!r}'.format(
  301. len(unsubmitted), len(to_submit), unsubmitted)
  302. LOG.error(msg)
  303. #ready.update(unsubmitted) # Resubmit only in pwatcher, if at all.
  304. raise Exception(msg)
  305. submitted.update(to_submit - unsubmitted)
  306. LOG.debug('N in queue: {} (max_jobs={})'.format(len(submitted), self.max_jobs))
  307. recently_done = set(self.tq.check_done())
  308. num_iterations_since_last_ping += 1
  309. if not recently_done:
  310. if not submitted:
  311. LOG.warning('Nothing is happening, and we had {} failures. Should we quit? Instead, we will just sleep.'.format(failures))
  312. #break
  313. if num_iterations_since_last_ping_max <= num_iterations_since_last_ping:
  314. # Ping!
  315. LOG.info('(slept for another {}s -- another {} loop iterations)'.format(
  316. slept_seconds_since_last_ping, num_iterations_since_last_ping))
  317. slept_seconds_since_last_ping = 0.0
  318. num_iterations_since_last_ping = 0
  319. num_iterations_since_last_ping_max += 1
  320. slept_seconds_since_last_ping += sleep_time
  321. time.sleep(sleep_time)
  322. sleep_time = sleep_time + 0.1 if (sleep_time < updateFreq) else updateFreq
  323. continue
  324. LOG.debug('recently_done: {!r}'.format([(rd, rd.satisfied()) for rd in recently_done]))
  325. LOG.debug('Num done in this iteration: {}'.format(len(recently_done)))
  326. sleep_time = init_sleep_time
  327. submitted -= recently_done
  328. recently_satisfied = set(n for n in recently_done if n.satisfied())
  329. recently_done -= recently_satisfied
  330. #LOG.debug('Now N recently_done: {}'.format(len(recently_done)))
  331. for node in recently_satisfied:
  332. ready.update(find_next_ready_and_remove(unsatg, node))
  333. if recently_satisfied:
  334. LOG.info('recently_satisfied:\n{}'.format(pprint.pformat(recently_satisfied)))
  335. LOG.info('Num satisfied in this iteration: {}'.format(len(recently_satisfied)))
  336. LOG.info('Num still unsatisfied: {}'.format(len(unsatg)))
  337. if recently_done:
  338. msg = 'Some tasks are recently_done but not satisfied: {!r}'.format(recently_done)
  339. LOG.error(msg)
  340. LOG.error('ready: {!r}\n\tsubmitted: {!r}'.format(ready, submitted))
  341. failures += len(recently_done)
  342. if exitOnFailure:
  343. raise Exception(msg)
  344. assert not submitted
  345. assert not ready
  346. if failures:
  347. raise Exception('We had {} failures. {} tasks remain unsatisfied.'.format(
  348. failures, len(unsatg)))
  349. @property
  350. def max_jobs(self):
  351. return self.__max_njobs
  352. @max_jobs.setter
  353. def max_jobs(self, val):
  354. pre = self.__max_njobs
  355. if pre != int(val):
  356. self.__max_njobs = int(val)
  357. LOG.info('Setting max_jobs to {}; was {}'.format(self.__max_njobs, pre))
  358. def __init__(self, watcher, job_type, job_defaults_dict, max_jobs, use_tmpdir, squash, jobid_generator,
  359. pre_script=None,
  360. ):
  361. self.graph = networkx.DiGraph()
  362. # TODO: Inject PwatcherTaskQueue
  363. self.tq = PwatcherTaskQueue(watcher=watcher, job_type=job_type, job_defaults_dict=job_defaults_dict,
  364. jobid_generator=jobid_generator,
  365. )
  366. assert isinstance(max_jobs, int)
  367. assert max_jobs > 0, 'max_jobs needs to be set. If you use the "blocking" process-watcher, it is also the number of threads.'
  368. self.__max_njobs = None
  369. self.max_jobs = max_jobs
  370. self.sentinels = dict() # sentinel_done_fn -> Node
  371. self.pypetask2node = dict()
  372. self.use_tmpdir = use_tmpdir
  373. if not pre_script:
  374. pre_script = os.environ.get('PYPEFLOW_PRE', None)
  375. if pre_script:
  376. LOG.warning('Found PYPEFLOW_PRE in env; using that for pre_script.')
  377. self.pre_script = pre_script # command(s) to run at top of bash scripts
  378. if self.pre_script:
  379. LOG.info('At top of bash scripts, we will run:\n{}'.format(self.pre_script))
  380. self.squash = squash # This really should depend on each Task, but for now a global is fine.
  381. # For small genomes, serial tasks should always be squashed.
  382. class NodeBase(object):
  383. """Graph node.
  384. Can be satisfied on demand, but usually we call execute() and later run the script.
  385. """
  386. def setSatisfied(self, s):
  387. self.__satisfied = s
  388. def workdir(self):
  389. return self.wdir
  390. def script_fn(self):
  391. return os.path.join(self.workdir(), 'run.sh')
  392. def sentinel_done_fn(self):
  393. return self.script_fn() + '.done'
  394. def satisfied(self):
  395. """Not just done, but successful.
  396. If we see the sentinel file, then we memoize True.
  397. But if we finished a distributed job with exit-code 0, we do not need
  398. to wait for the sentinel; we know we had success.
  399. """
  400. #LOG.debug('Checking satisfied({!r}) for sentinel {!r}'.format(self, self.sentinel_done_fn()))
  401. if self.__satisfied is not None:
  402. return self.__satisfied
  403. if os.path.exists(self.sentinel_done_fn()):
  404. self.setSatisfied(True)
  405. return self.__satisfied == True
  406. def satisfy(self):
  407. if self.__satisfied:
  408. return
  409. # Technically, we might need to cd into wdir first.
  410. script_fn = self.execute()
  411. if script_fn:
  412. run(script_fn)
  413. self.__satisfied = True
  414. def execute(self):
  415. try:
  416. actual_script_fn = self.generate_script()
  417. except Exception:
  418. LOG.exception('Failed generate_script() for {!r}'.format(self))
  419. raise
  420. sentinel_done_fn = self.sentinel_done_fn()
  421. wdir = self.workdir()
  422. rel_actual_script_fn = os.path.relpath(actual_script_fn, wdir)
  423. wrapper = """#!/bin/sh
  424. set -vex
  425. export PATH=$PATH:/bin
  426. cd {wdir}
  427. /bin/bash {rel_actual_script_fn}
  428. touch {sentinel_done_fn}
  429. """.format(**locals())
  430. wrapper_fn = self.script_fn()
  431. #mkdirs(os.path.dirname(wrapper_fn))
  432. with open(wrapper_fn, 'w') as ofs:
  433. ofs.write(wrapper)
  434. return wrapper_fn
  435. def generate_script(self):
  436. raise NotImplementedError(repr(self))
  437. def __repr__(self):
  438. return 'Node({})'.format(self.name)
  439. def __init__(self, name, wdir, needs):
  440. self.__satisfied = None # satisfiable
  441. self.name = name
  442. self.wdir = wdir
  443. self.needs = needs
  444. class PypeNode(NodeBase):
  445. """
  446. We will clean this up later. For now, it is pretty tightly coupled to PypeTask.
  447. """
  448. def generate_script(self):
  449. wdir = os.path.normpath(self.wdir)
  450. mkdirs(wdir)
  451. pt = self.pypetask
  452. assert pt.wdir == self.wdir
  453. inputs = {k:os.path.relpath(v.path, wdir) for k,v in pt.inputs.items()}
  454. outputs = {k:os.path.relpath(v.path, wdir) for k,v in pt.outputs.items()}
  455. for v in outputs.values():
  456. assert not os.path.isabs(v), '{!r} is not relative'.format(v)
  457. bash_template = pt.bash_template
  458. LOG.debug("====>inputs={},\n====>outputs={},\n====>bash_template={}"
  459. .format(inputs, outputs, bash_template))
  460. if bash_template is not None:
  461. # Write bash_template.
  462. bash_script_fn = os.path.join(wdir, 'template.sh')
  463. with open(bash_script_fn, 'w') as ofs:
  464. message = '# Substitution will be similar to snakemake "shell".\n'
  465. if self.pre_script:
  466. message += self.pre_script + '\n'
  467. ofs.write(message + bash_template)
  468. task_desc = {
  469. 'inputs': inputs,
  470. 'outputs': outputs,
  471. 'parameters': pt.parameters,
  472. 'bash_template_fn' : 'template.sh',
  473. }
  474. else:
  475. raise Exception('We no longer support python functions as PypeTasks.')
  476. task_content = json.dumps(task_desc, sort_keys=True, indent=4, separators=(',', ': ')) + '\n'
  477. task_json_fn = os.path.join(wdir, 'task.json')
  478. open(task_json_fn, 'w').write(task_content)
  479. python = 'python3' # sys.executable fails sometimes because of binwrapper: SE-152
  480. tmpdir_flag = '--tmpdir {}'.format(self.use_tmpdir) if self.use_tmpdir else ''
  481. cmd = '{} -m pypeflow.do_task {} {}'.format(python, tmpdir_flag, task_json_fn)
  482. env_setup = 'env | sort'
  483. if self.pre_script:
  484. env_setup = self.pre_script + '\n' + env_setup
  485. script_content = """#!/bin/bash
  486. onerror () {{
  487. set -vx
  488. echo "FAILURE. Running top in $(pwd) (If you see -terminal database is inaccessible- you are using the python bin-wrapper, so you will not get diagnostic info. No big deal. This process is crashing anyway.)"
  489. rm -f top.txt
  490. which python
  491. which top
  492. env -u LD_LIBRARY_PATH top -b -n 1 >| top.txt &
  493. env -u LD_LIBRARY_PATH top -b -n 1 2>&1
  494. pstree -apl
  495. }}
  496. trap onerror ERR
  497. {env_setup}
  498. echo "HOSTNAME=$(hostname)"
  499. echo "PWD=$(pwd)"
  500. time {cmd}
  501. """.format(**locals())
  502. script_fn = os.path.join(wdir, 'task.sh')
  503. open(script_fn, 'w').write(script_content)
  504. return script_fn
  505. def old_generate_script(self):
  506. pt = self.pypetask
  507. func = pt.func
  508. func(pt) # Run the function! Probably just generate a script.
  509. generated_script_fn = getattr(pt, 'generated_script_fn', None) # by convention
  510. try:
  511. # Maybe we should require a script always.
  512. generated_script_fn = pt.generated_script_fn
  513. except Exception:
  514. LOG.exception('name={!r} URL={!r}'.format(pt.name, pt.URL))
  515. raise
  516. return generated_script_fn
  517. def __init__(self, name, wdir, pypetask, needs, use_tmpdir, pre_script):
  518. super(PypeNode, self).__init__(name, wdir, needs)
  519. self.pypetask = pypetask
  520. self.use_tmpdir = use_tmpdir
  521. self.pre_script = pre_script
  522. # This global exists only because we continue to support the old PypeTask style,
  523. # where a PypeLocalFile does not know the PypeTask which produces it.
  524. # (This also allows us to specify PypeTasks out of order, fwiw.)
  525. # Someday, we might require PypeTasks to depend on outputs of other PypeTasks explicitly;
  526. # then we can drop this dict.
  527. PRODUCERS = dict()
  528. def findPypeLocalFile(path):
  529. """Look-up based on tail dirname.
  530. Do not call this for paths relative to their work-dirs.
  531. """
  532. assert os.path.isabs(path)
  533. basename = os.path.basename(path)
  534. basedir = os.path.relpath(os.path.dirname(path))
  535. producer = PRODUCERS.get(basedir)
  536. if producer is None:
  537. msg = 'No producer PypeTask for basedir {!r} from path {!r} -- Pure input?'.format(
  538. basedir, path)
  539. LOG.debug(msg)
  540. return PypeLocalFile(path, None)
  541. siblings = producer.outputs
  542. for plf in siblings.values():
  543. sibling_basename = os.path.basename(plf.path)
  544. if sibling_basename == basename:
  545. return plf
  546. raise Exception('Failed to find a PypeLocalFile for {!r} among outputs of {!r}\n\t(Note that pure inputs must not be in task directories.)'.format(
  547. path, producer))
  548. def find_work_dir(paths):
  549. """Return absolute path to directory of all these paths.
  550. Must be same for all.
  551. """
  552. dirnames = set(os.path.dirname(os.path.normpath(p)) for p in paths)
  553. if len(dirnames) != 1:
  554. raise Exception('Cannot find work-dir for paths in multiple (or zero) dirs: {} in {}'.format(
  555. pprint.pformat(paths), pprint.pformat(dirnames)))
  556. d = dirnames.pop()
  557. return os.path.abspath(d)
  558. class PypeLocalFile(object):
  559. def __repr__(self):
  560. if self.producer:
  561. path = os.path.relpath(self.path, self.producer.wdir)
  562. return 'PLF({!r}, {!r})'.format(path, os.path.relpath(self.producer.wdir))
  563. else:
  564. path = os.path.relpath(self.path)
  565. wdir = None
  566. return 'PLF({!r}, {!r})'.format(path, None)
  567. def __init__(self, path, producer):
  568. self.path = os.path.abspath(path)
  569. self.producer = producer
  570. def makePypeLocalFile(p, producer=None):
  571. return PypeLocalFile(p, producer)
  572. def fn(p):
  573. """This must be run in the top run-dir.
  574. All task funcs are executed there.
  575. """
  576. if isinstance(p, PypeLocalFile):
  577. p = p.path
  578. return os.path.abspath(p)
  579. def only_path(p):
  580. if isinstance(p, PypeLocalFile):
  581. return p.path
  582. else:
  583. return p
  584. class Dist(object):
  585. @property
  586. def pypeflow_nproc(self):
  587. return self.job_dict['NPROC']
  588. @property
  589. def pypeflow_mb(self):
  590. """per processor"""
  591. return self.job_dict['MB']
  592. def __init__(self, NPROC=1, MB=4000, local=False, job_dict={}, use_tmpdir=None):
  593. my_job_dict = {}
  594. if local:
  595. # Keep it simple. If we run local-only, then do not even bother with tmpdir.
  596. # This helps with simpler scatter/gather tasks, which copy paths.
  597. use_tmpdir = False
  598. my_job_dict['NPROC'] = NPROC
  599. my_job_dict['MB'] = MB
  600. my_job_dict.update(job_dict) # user overrides everything
  601. self.job_dict = my_job_dict
  602. self.local = local
  603. self.use_tmpdir = use_tmpdir
  604. def PypeTask(inputs, outputs, parameters=None, wdir=None, bash_template=None, dist=None):
  605. """A slightly messy factory because we want to support both strings and PypeLocalFiles, for now.
  606. This can alter dict values in inputs/outputs if they were not already PypeLocalFiles.
  607. """
  608. if dist is None:
  609. dist = Dist()
  610. LOG.debug('New PypeTask(wdir={!r},\n\tinputs={!r},\n\toutputs={!r})'.format(
  611. wdir, inputs, outputs))
  612. if wdir is None:
  613. #wdir = parameters.get('wdir', name) # One of these must be a string!
  614. wdir = find_work_dir([only_path(v) for v in outputs.values()])
  615. # Since we derived wdir from outputs, any relative paths should become absolute.
  616. for k,v in list(outputs.items()):
  617. if not isinstance(v, PypeLocalFile) and not os.path.isabs(v):
  618. outputs[k] = os.path.abspath(v)
  619. for k,v in list(inputs.items()):
  620. if not isinstance(v, PypeLocalFile) and not os.path.isabs(v):
  621. inputs[k] = os.path.abspath(v)
  622. else:
  623. # relpaths are relative to the provided wdir,
  624. pass
  625. if not os.path.isabs(wdir):
  626. wdir = os.path.abspath(wdir)
  627. this = _PypeTask(inputs, outputs, wdir, parameters, bash_template, dist)
  628. #basedir = os.path.basename(wdir)
  629. basedir = this.name
  630. if basedir in PRODUCERS:
  631. raise Exception('Basedir {!r} already used for {!r}. Cannot create new PypeTask {!r}.'.format(
  632. basedir, PRODUCERS[basedir], this))
  633. LOG.debug('Added PRODUCERS[{!r}] = {!r}'.format(basedir, this))
  634. PRODUCERS[basedir] = this
  635. this.canonicalize() # Already abs, but make everything a PLF.
  636. this.assert_canonical()
  637. LOG.debug('Built {!r}'.format(this))
  638. return this
  639. class _PypeTask(object):
  640. """Adaptor from old PypeTask API.
  641. """
  642. def canonicalize(self):
  643. for key, val in list(self.outputs.items()):
  644. if not isinstance(val, PypeLocalFile):
  645. # If relative, then it is relative to our wdir.
  646. #LOG.warning('Making PLF: {!r} {!r}'.format(val, self))
  647. if not os.path.isabs(val):
  648. val = os.path.join(self.wdir, val)
  649. #LOG.warning('Muking PLF: {!r} {!r}'.format(val, self))
  650. val = PypeLocalFile(val, self)
  651. self.outputs[key] = val
  652. else:
  653. val.producer = self
  654. for key, val in list(self.inputs.items()):
  655. if not isinstance(val, PypeLocalFile):
  656. assert os.path.isabs(val), 'Inputs cannot be relative at self point: {!r} in {!r}'.format(val, self)
  657. self.inputs[key] = findPypeLocalFile(val)
  658. def assert_canonical(self):
  659. # Output values will be updated after PypeTask init, so refer back to self as producer.
  660. for k,v in self.inputs.items():
  661. assert os.path.isabs(v.path), 'For {!r}, input {!r} is not absolute'.format(self.wdir, v)
  662. for k,v in self.outputs.items():
  663. assert os.path.isabs(v.path), 'For {!r}, output {!r} is not absolute'.format(self.wdir, v)
  664. common = set(self.inputs.keys()) & set(self.outputs.keys())
  665. assert (not common), 'A key is used for both inputs and outputs of PypeTask({}), which could be ok, but only if we refer to them as input.foo and output.foo in the bash script: {!r}'.format(self.wdir, common)
  666. def __call__(self, func):
  667. self.func = func
  668. self.__name__ = '{}.{}'.format(func.__module__, func.__name__)
  669. return self
  670. def __repr__(self):
  671. return 'PypeTask({!r}, {!r}, {!r}, {!r})'.format(self.name, self.wdir, pprint.pformat(self.outputs), pprint.pformat(self.inputs))
  672. def __init__(self, inputs, outputs, wdir, parameters, bash_template, dist):
  673. if parameters is None:
  674. parameters = {}
  675. name = os.path.relpath(wdir)
  676. URL = 'task://localhost/{}'.format(name)
  677. self.inputs = inputs
  678. self.outputs = outputs
  679. self.parameters = dict(parameters) # Always copy this, so caller can re-use, for convenience.
  680. self.bash_template = bash_template
  681. self.wdir = wdir
  682. self.name = name
  683. self.URL = URL
  684. self.dist = dist
  685. #for key, bn in inputs.iteritems():
  686. # setattr(self, key, os.path.abspath(bn))
  687. #for key, bn in outputs.iteritems():
  688. # setattr(self, key, os.path.abspath(bn))
  689. assert self.wdir, 'No wdir for {!r} {!r}'.format(self.name, self.URL)
  690. LOG.debug('Created {!r}'.format(self))
  691. MyFakePypeThreadTaskBase = None # just a symbol, not really used
  692. # Here is the main factory.
  693. def PypeProcWatcherWorkflow(
  694. URL = None,
  695. job_defaults=dict(),
  696. squash = False,
  697. **attributes):
  698. """Factory for the workflow.
  699. """
  700. job_type = job_defaults.get('job_type', 'local')
  701. job_name_style = job_defaults.get('job_name_style', '')
  702. watcher_type = job_defaults.get('pwatcher_type', 'fs_based')
  703. watcher_directory = job_defaults.get('pwatcher_directory', 'mypwatcher')
  704. use_tmpdir = job_defaults.get('use_tmpdir', None)
  705. max_jobs = int(job_defaults.get('njobs', 24)) # must be > 0, but not too high
  706. if watcher_type == 'blocking':
  707. pwatcher_impl = pwatcher.blocking
  708. elif watcher_type == 'network_based':
  709. pwatcher_impl = pwatcher.network_based
  710. else:
  711. pwatcher_impl = pwatcher.fs_based
  712. LOG.info('In simple_pwatcher_bridge, pwatcher_impl={!r}'.format(pwatcher_impl))
  713. watcher = pwatcher_impl.get_process_watcher(watcher_directory)
  714. if use_tmpdir:
  715. try:
  716. if not os.path.isabs(use_tmpdir):
  717. use_tmpdir = os.path.abspath(use_tmpdir)
  718. except (TypeError, AttributeError):
  719. use_tmpdir = tempfile.gettempdir()
  720. if not job_name_style:
  721. job_name_style = 0
  722. LOG.info('job_type={!r}, (default)job_defaults={!r}, use_tmpdir={!r}, squash={!r}, job_name_style={!r}'.format(
  723. job_type, job_defaults, use_tmpdir, squash, job_name_style,
  724. ))
  725. jobid_generator = int(job_name_style)
  726. return Workflow(watcher,
  727. job_type=job_type, job_defaults_dict=job_defaults, max_jobs=max_jobs, use_tmpdir=use_tmpdir,
  728. squash=squash, jobid_generator=jobid_generator,
  729. )
  730. #th = MyPypeFakeThreadsHandler('mypwatcher', job_type, job_queue)
  731. #mq = MyMessageQueue()
  732. #se = MyFakeShutdownEvent() # TODO: Save pwatcher state on ShutdownEvent. (Not needed for blocking pwatcher. Mildly useful for fs_based.)
  733. #return pypeflow.controller._PypeConcurrentWorkflow(URL=URL, thread_handler=th, messageQueue=mq, shutdown_event=se,
  734. # attributes=attributes)
  735. __all__ = ['PypeProcWatcherWorkflow', 'fn', 'makePypeLocalFile', 'MyFakePypeThreadTaskBase', 'PypeTask']