network_heartbeat.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. """Network server heartbeat wrapper
  2. Perl might be better for efficiency.
  3. But we will use python for now.
  4. Non-zero status means *this* failed, not the wrapped command.
  5. """
  6. import argparse
  7. import os
  8. import shlex
  9. import socket
  10. import subprocess
  11. import sys
  12. import threading
  13. import time
  14. DESCRIPTION = """
  15. We wrap a system call to produce a heartbeat.
  16. """
  17. EPILOG = """
  18. We log to the status server, and forward command stdout/stderr as well.
  19. """
  20. class _Formatter(argparse.RawDescriptionHelpFormatter, argparse.ArgumentDefaultsHelpFormatter):
  21. pass
  22. _FORMATTER_CLASS = _Formatter
  23. def parse_args(args):
  24. parser = argparse.ArgumentParser(
  25. description=DESCRIPTION,
  26. epilog=EPILOG,
  27. formatter_class=_FORMATTER_CLASS,
  28. )
  29. parser.add_argument('--rate',
  30. help='Heartbeat rate, in seconds',
  31. type=int,
  32. default=600,
  33. )
  34. parser.add_argument('--heartbeat-server',
  35. help='Address of the heartbeat server',
  36. required=True,
  37. )
  38. parser.add_argument('--heartbeat-port',
  39. help='Port of the heartbeat server',
  40. type=int,
  41. required=True,
  42. )
  43. parser.add_argument('--jobid',
  44. help='Our jobid',
  45. required=True,
  46. )
  47. parser.add_argument('--exit-dir',
  48. help='Path to emergency exit sentinel directory',
  49. required=True,
  50. )
  51. parser.add_argument('--directory',
  52. help='Directory in which to run COMMAND.',
  53. default='.',
  54. )
  55. parser.add_argument('command',
  56. help='System call (to be joined by " "). We will block on this and return its result.',
  57. nargs='+',
  58. #required=True,
  59. )
  60. return parser.parse_args(args)
  61. # send message delimited with a \0
  62. def socket_send(socket, message):
  63. socket.sendall(b'{}\0'.format(message))
  64. def log(heartbeat_server, jobid, msg):
  65. hsocket = socket.socket()
  66. try:
  67. hsocket.connect(heartbeat_server)
  68. socket_send(hsocket, 's {} {}\n'.format(jobid, msg))
  69. hsocket.close()
  70. except IOError: # better to miss a line than terminate
  71. pass
  72. def thread_heartbeat(heartbeat_server, jobid, sleep_s):
  73. pid = os.getpid()
  74. pgid = os.getpgid(0)
  75. hsocket = socket.socket()
  76. try:
  77. hsocket.connect(heartbeat_server)
  78. socket_send(hsocket, 'i {} {} {}'.format(jobid, pid, pgid))
  79. hsocket.close()
  80. except IOError: # we hope it's a temporary error
  81. pass
  82. while True:
  83. time.sleep(sleep_s)
  84. hsocket = socket.socket()
  85. try:
  86. hsocket.connect(heartbeat_server)
  87. socket_send(hsocket, 'h {}'.format(jobid))
  88. hsocket.close()
  89. except IOError: # we hope it's a temporary error
  90. pass
  91. def start_heartbeat(heartbeat_server, jobid, sleep_s):
  92. hb = threading.Thread(target=thread_heartbeat, args=(heartbeat_server, jobid, sleep_s))
  93. log(heartbeat_server, jobid, 'alive? {}'.format(
  94. bool(hb.is_alive())))
  95. hb.daemon = True
  96. hb.start()
  97. return hb
  98. def run(args):
  99. heartbeat_server = (args.heartbeat_server, args.heartbeat_port)
  100. jobid = args.jobid
  101. log(heartbeat_server, jobid, repr(args))
  102. os.chdir(args.directory)
  103. exit_dir = args.exit_dir
  104. exit_fn = os.path.join(os.path.abspath(exit_dir), jobid)
  105. cwd = os.getcwd()
  106. hostname = socket.getfqdn()
  107. sleep_s = args.rate
  108. log(heartbeat_server, jobid, """
  109. cwd:{cwd!r}
  110. hostname={hostname}
  111. heartbeat_server={heartbeat_server!r}
  112. jobid={jobid}
  113. exit_dir={exit_dir!r}
  114. sleep_s={sleep_s!r}""".format(
  115. **locals()))
  116. log(heartbeat_server, jobid, "before setpgid: pid={} pgid={}".format(os.getpid(), os.getpgid(0)))
  117. try:
  118. os.setpgid(0, 0) # This allows the entire tree of procs to be killed.
  119. log(heartbeat_server, jobid, " after setpgid: pid={} pgid={}".format(
  120. os.getpid(), os.getpgid(0)))
  121. except OSError as e:
  122. log(heartbeat_server, jobid, ' Unable to set pgid. Possibly a grid job? Hopefully there will be no dangling processes when killed: {}'.format(
  123. repr(e)))
  124. thread = start_heartbeat(heartbeat_server, jobid, sleep_s)
  125. log(heartbeat_server, jobid, 'alive? {} pid={} pgid={}'.format(
  126. bool(thread.is_alive()), os.getpid(), os.getpgid(0)))
  127. call = ' '.join(args.command)
  128. log(heartbeat_server, jobid, 'In cwd: {}, Blocking call: {!r}'.format(
  129. os.getcwd(), call))
  130. sp = subprocess.Popen(shlex.split(call), stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
  131. # forward all output to server until job ends, then get exit value
  132. with sp.stdout as f:
  133. for line in iter(f.readline, b''):
  134. # can't use log() for this because it appends a \n
  135. hsocket = socket.socket()
  136. try:
  137. hsocket.connect(heartbeat_server)
  138. socket_send(hsocket, 's {} {}'.format(jobid, line))
  139. hsocket.close()
  140. except IOError: # better to miss a line than terminate
  141. pass
  142. rc = sp.wait()
  143. log(heartbeat_server, jobid, ' returned: {!r}'.format(
  144. rc))
  145. hsocket = socket.socket()
  146. try:
  147. hsocket.connect(heartbeat_server)
  148. socket_send(hsocket, 'e {} {}'.format(jobid, rc))
  149. hsocket.close()
  150. except IOError as e:
  151. log(heartbeat_server, jobid, 'could not update heartbeat server with exit status: {} {}: {!r}'.format(jobid, rc, e))
  152. with open(exit_fn, 'w') as f:
  153. f.write(str(rc))
  154. # sys.exit(rc) # No-one would see this anyway.
  155. def main():
  156. args = parse_args(sys.argv[1:])
  157. run(args)
  158. if __name__ == "__main__":
  159. main()