fs_heartbeat.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. """Filesystem heartbeat wrapper
  2. Perl might be better for efficiency.
  3. But we will use python for now.
  4. Non-zero status means *this* failed, not the wrapped command.
  5. """
  6. import argparse
  7. import os
  8. import socket
  9. import sys
  10. import threading
  11. import time
  12. DESCRIPTION = """
  13. We wrap a system call to produce both a heartbeat and an exit-sentinel
  14. in the filesystem.
  15. """
  16. EPILOG = """
  17. We share stderr/stdout with the command. We log to stderr (for now).
  18. """
  19. HEARTBEAT_TEMPLATE = '0 {pid} {pgid}\n'
  20. EXIT_TEMPLATE = '{exit_code}'
  21. class _Formatter(argparse.RawDescriptionHelpFormatter, argparse.ArgumentDefaultsHelpFormatter):
  22. pass
  23. _FORMATTER_CLASS = _Formatter
  24. def parse_args(args):
  25. parser = argparse.ArgumentParser(
  26. description=DESCRIPTION,
  27. epilog=EPILOG,
  28. formatter_class=_FORMATTER_CLASS,
  29. )
  30. parser.add_argument('--rate',
  31. help='Heartbeat rate, in seconds',
  32. type=float,
  33. default=1.0, # TODO: Make this at least 10, maybe 60.
  34. )
  35. parser.add_argument('--heartbeat-file',
  36. help='Path to heartbeat file. The first line will have the format {!r}. The rest are just elapsed time'.format(
  37. HEARTBEAT_TEMPLATE),
  38. required=True,
  39. )
  40. parser.add_argument('--exit-file',
  41. help='Path to exit sentinel file. At end, it will have the format {!r}'.format(
  42. EXIT_TEMPLATE),
  43. required=True,
  44. )
  45. parser.add_argument('--directory',
  46. help='Directory in which to run COMMAND.',
  47. default='.',
  48. )
  49. parser.add_argument('command',
  50. help='System call (to be joined by " "). We will block on this and return its result.',
  51. nargs='+',
  52. #required=True,
  53. )
  54. return parser.parse_args(args)
  55. def log(msg):
  56. sys.stderr.write(msg)
  57. sys.stderr.write('\n')
  58. #sys.stdout.flush() # If we use stdout.
  59. def thread_heartbeat(heartbeat_fn, sleep_s):
  60. with open(heartbeat_fn, 'w') as ofs:
  61. pid = os.getpid()
  62. pgid = os.getpgid(0)
  63. ofs.write(HEARTBEAT_TEMPLATE.format(
  64. **locals()))
  65. elapsed = 0
  66. ctime = 0
  67. while True:
  68. #ctime = time.time()
  69. ofs.write('{elapsed} {ctime}\n'.format(
  70. **locals()))
  71. ofs.flush()
  72. time.sleep(sleep_s)
  73. elapsed += 1
  74. def start_heartbeat(heartbeat_fn, sleep_s):
  75. hb = threading.Thread(target=thread_heartbeat, args=(heartbeat_fn, sleep_s))
  76. log('alive? {}'.format(
  77. bool(hb.is_alive())))
  78. hb.daemon = True
  79. hb.start()
  80. return hb
  81. def run(args):
  82. os.chdir(args.directory)
  83. heartbeat_fn = os.path.abspath(args.heartbeat_file)
  84. exit_fn = os.path.abspath(args.exit_file)
  85. cwd = os.getcwd()
  86. hostname = socket.getfqdn()
  87. sleep_s = args.rate
  88. log("""
  89. cwd:{cwd!r}
  90. hostname={hostname}
  91. heartbeat_fn={heartbeat_fn!r}
  92. exit_fn={exit_fn!r}
  93. sleep_s={sleep_s!r}""".format(
  94. **locals()))
  95. if os.path.exists(exit_fn):
  96. os.remove(exit_fn)
  97. if os.path.exists(heartbeat_fn):
  98. os.remove(heartbeat_fn)
  99. #os.system('touch {}'.format(heartbeat_fn)) # This would be over-written anyway.
  100. log("before setpgid: pid={} pgid={}".format(os.getpid(), os.getpgid(0)))
  101. try:
  102. os.setpgid(0, 0) # This allows the entire tree of procs to be killed.
  103. log(" after setpgid: pid={} pgid={}".format(
  104. os.getpid(), os.getpgid(0)))
  105. except OSError as e:
  106. log(' Unable to set pgid. Possibly a grid job? Hopefully there will be no dangling processes when killed: {}'.format(
  107. repr(e)))
  108. #thread = start_heartbeat(heartbeat_fn, sleep_s)
  109. #log('alive? {} pid={} pgid={}'.format(
  110. # bool(thread.is_alive()), os.getpid(), os.getpgid(0)))
  111. call = ' '.join(args.command)
  112. log('In cwd: {}, Blocking call: {!r}'.format(
  113. os.getcwd(), call))
  114. rc = os.system(call) # Blocking.
  115. log(' returned: {!r}'.format(
  116. rc))
  117. # Do not delete the heartbeat here. The discoverer of the exit-sentinel will do that,
  118. # to avoid a race condition.
  119. #if os.path.exists(heartbeat_fn):
  120. # os.remove(heartbeat_fn)
  121. exit_tmp_fn = exit_fn + '.tmp'
  122. with open(exit_tmp_fn, 'w') as ofs:
  123. ofs.write(EXIT_TEMPLATE.format(
  124. exit_code=rc))
  125. os.rename(exit_tmp_fn, exit_fn) # atomic
  126. # sys.exit(rc) # No-one would see this anyway.
  127. def main():
  128. args = parse_args(sys.argv[1:])
  129. log(repr(args))
  130. run(args)
  131. if __name__ == "__main__":
  132. main()