pypeflow_example.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. from pypeflow.simple_pwatcher_bridge import (PypeProcWatcherWorkflow, MyFakePypeThreadTaskBase,
  2. makePypeLocalFile, fn, PypeTask)
  3. import json
  4. import logging.config
  5. import os
  6. import sys
  7. JOB_TYPE = os.environ.get('JOB_TYPE', 'local')
  8. SLEEP_S = os.environ.get('SLEEP_S', '1')
  9. log = logging.getLogger(__name__)
  10. def spawn(args, check=False):
  11. cmd = args[0]
  12. log.debug('$(%s %s)' %(cmd, repr(args)))
  13. rc = os.spawnv(os.P_WAIT, cmd, args) # spawnvp for PATH lookup
  14. msg = "Call %r returned %d." % (cmd, rc)
  15. if rc:
  16. log.warning(msg)
  17. if check:
  18. raise Exception(msg)
  19. else:
  20. log.debug(msg)
  21. return rc
  22. def system(call, check=False):
  23. log.debug('$(%s)' %repr(call))
  24. rc = os.system(call)
  25. msg = "Call %r returned %d." % (call, rc)
  26. if rc:
  27. log.warning(msg)
  28. if check:
  29. raise Exception(msg)
  30. else:
  31. log.debug(msg)
  32. return rc
  33. def makedirs(d):
  34. if not os.path.isdir(d):
  35. os.makedirs(d)
  36. def taskrun0(self):
  37. template = """
  38. sleep_s=%(sleep_s)s
  39. ofile=%(ofile)s
  40. set -vex
  41. echo start0
  42. sleep ${sleep_s}
  43. touch ${ofile}
  44. echo end0
  45. """
  46. bash = template %dict(
  47. #ifile=fn(self.i0),
  48. ofile=fn(self.f0),
  49. sleep_s=self.parameters['sleep_s'],
  50. )
  51. log.debug('taskrun0 bash:\n' + bash)
  52. script = 'taskrun0.sh'
  53. with open(script, 'w') as ofs:
  54. ofs.write(bash)
  55. #system("bash {}".format(script), check=True)
  56. #spawn(['/bin/bash', script], check=True) # Beware! Hard to kill procs.
  57. self.generated_script_fn = script
  58. return script
  59. def taskrun1(self):
  60. template = """
  61. sleep_s=%(sleep_s)s
  62. ifile=%(ifile)s
  63. ofile=%(ofile)s
  64. set -vex
  65. echo start1
  66. sleep ${sleep_s}
  67. cp -f ${ifile} ${ofile}
  68. echo end1
  69. """
  70. bash = template %dict(
  71. ifile=fn(self.f0),
  72. ofile=fn(self.f1),
  73. sleep_s=self.parameters['sleep_s'],
  74. )
  75. log.debug('taskrun1 bash:\n' + bash)
  76. script = 'taskrun1.sh'
  77. with open(script, 'w') as ofs:
  78. ofs.write(bash)
  79. #system("bash {}".format(script), check=True)
  80. self.generated_script_fn = script
  81. return script
  82. def main():
  83. lfn = 'logging-cfg.json'
  84. if os.path.exists(lfn):
  85. logging.config.dictConfig(json.load(open(lfn)))
  86. else:
  87. logging.basicConfig()
  88. logging.getLogger().setLevel(logging.NOTSET)
  89. try:
  90. import logging_tree
  91. logging_tree.printout()
  92. except ImportError:
  93. pass
  94. log.debug('DEBUG LOGGING ON')
  95. log.warning('Available via env: JOB_TYPE={}, SLEEP_S={}'.format(
  96. JOB_TYPE, SLEEP_S))
  97. exitOnFailure=False
  98. concurrent_jobs=2
  99. Workflow = PypeProcWatcherWorkflow
  100. wf = Workflow(job_type=JOB_TYPE)
  101. wf.max_jobs = concurrent_jobs
  102. par = dict(sleep_s=SLEEP_S)
  103. DIR ='mytmp'
  104. makedirs(DIR)
  105. f0 = makePypeLocalFile('mytmp/f0')
  106. f1 = makePypeLocalFile('mytmp/f1')
  107. make_task = PypeTask(
  108. inputs = {},
  109. outputs = {'f0': f0},
  110. parameters = par,
  111. )
  112. task = make_task(taskrun0)
  113. wf.addTasks([task])
  114. make_task = PypeTask(
  115. inputs = {'f0': f0},
  116. outputs = {'f1': f1},
  117. parameters = par,
  118. )
  119. task = make_task(taskrun1)
  120. wf.addTasks([task])
  121. wf.refreshTargets([task])
  122. #wf.refreshTargets(exitOnFailure=exitOnFailure)
  123. if __name__ == "__main__":
  124. main()