fluentd.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. """
  2. Module for performing checks on an Fluentd logging deployment
  3. """
  4. import json
  5. from openshift_checks import get_var
  6. from openshift_checks.logging.logging import LoggingCheck
  7. class Fluentd(LoggingCheck):
  8. """Module that checks an integrated logging Fluentd deployment"""
  9. name = "fluentd"
  10. tags = ["health", "logging"]
  11. logging_namespace = None
  12. def run(self, tmp, task_vars):
  13. """Check various things and gather errors. Returns: result as hash"""
  14. self.logging_namespace = get_var(task_vars, "openshift_logging_namespace", default="logging")
  15. fluentd_pods, error = super(Fluentd, self).get_pods_for_component(
  16. self.execute_module,
  17. self.logging_namespace,
  18. "fluentd",
  19. task_vars,
  20. )
  21. if error:
  22. return {"failed": True, "changed": False, "msg": error}
  23. check_error = self.check_fluentd(fluentd_pods, task_vars)
  24. if check_error:
  25. msg = ("The following Fluentd deployment issue was found:"
  26. "\n-------\n"
  27. "{}".format(check_error))
  28. return {"failed": True, "changed": False, "msg": msg}
  29. # TODO(lmeyer): run it all again for the ops cluster
  30. return {"failed": False, "changed": False, "msg": 'No problems found with Fluentd deployment.'}
  31. @staticmethod
  32. def _filter_fluentd_labeled_nodes(nodes_by_name, node_selector):
  33. """Filter to all nodes with fluentd label. Returns dict(name: node), error string"""
  34. label, value = node_selector.split('=', 1)
  35. fluentd_nodes = {
  36. name: node for name, node in nodes_by_name.items()
  37. if node['metadata']['labels'].get(label) == value
  38. }
  39. if not fluentd_nodes:
  40. return None, (
  41. 'There are no nodes with the fluentd label {label}.\n'
  42. 'This means no logs will be aggregated from the nodes.'
  43. ).format(label=node_selector)
  44. return fluentd_nodes, None
  45. @staticmethod
  46. def _check_node_labeling(nodes_by_name, fluentd_nodes, node_selector, task_vars):
  47. """Note if nodes are not labeled as expected. Returns: error string"""
  48. intended_nodes = get_var(task_vars, 'openshift_logging_fluentd_hosts', default=['--all'])
  49. if not intended_nodes or '--all' in intended_nodes:
  50. intended_nodes = nodes_by_name.keys()
  51. nodes_missing_labels = set(intended_nodes) - set(fluentd_nodes.keys())
  52. if nodes_missing_labels:
  53. return (
  54. 'The following nodes are supposed to be labeled with {label} but are not:\n'
  55. ' {nodes}\n'
  56. 'Fluentd will not aggregate logs from these nodes.'
  57. ).format(label=node_selector, nodes=', '.join(nodes_missing_labels))
  58. return None
  59. @staticmethod
  60. def _check_nodes_have_fluentd(pods, fluentd_nodes):
  61. """Make sure fluentd is on all the labeled nodes. Returns: error string"""
  62. unmatched_nodes = fluentd_nodes.copy()
  63. node_names_by_label = {
  64. node['metadata']['labels']['kubernetes.io/hostname']: name
  65. for name, node in fluentd_nodes.items()
  66. }
  67. node_names_by_internal_ip = {
  68. address['address']: name
  69. for name, node in fluentd_nodes.items()
  70. for address in node['status']['addresses']
  71. if address['type'] == "InternalIP"
  72. }
  73. for pod in pods:
  74. for name in [
  75. pod['spec']['nodeName'],
  76. node_names_by_internal_ip.get(pod['spec']['nodeName']),
  77. node_names_by_label.get(pod.get('spec', {}).get('host')),
  78. ]:
  79. unmatched_nodes.pop(name, None)
  80. if unmatched_nodes:
  81. return (
  82. 'The following nodes are supposed to have a Fluentd pod but do not:\n'
  83. '{nodes}'
  84. 'These nodes will not have their logs aggregated.'
  85. ).format(nodes=''.join(
  86. " {}\n".format(name)
  87. for name in unmatched_nodes.keys()
  88. ))
  89. return None
  90. def _check_fluentd_pods_running(self, pods):
  91. """Make sure all fluentd pods are running. Returns: error string"""
  92. not_running = super(Fluentd, self).not_running_pods(pods)
  93. if not_running:
  94. return (
  95. 'The following Fluentd pods are supposed to be running but are not:\n'
  96. '{pods}'
  97. 'These pods will not aggregate logs from their nodes.'
  98. ).format(pods=''.join(
  99. " {} ({})\n".format(pod['metadata']['name'], pod['spec'].get('host', 'None'))
  100. for pod in not_running
  101. ))
  102. return None
  103. def check_fluentd(self, pods, task_vars):
  104. """Verify fluentd is running everywhere. Returns: error string"""
  105. node_selector = get_var(task_vars, 'openshift_logging_fluentd_nodeselector',
  106. default='logging-infra-fluentd=true')
  107. nodes_by_name, error = self.get_nodes_by_name(task_vars)
  108. if error:
  109. return error
  110. fluentd_nodes, error = self._filter_fluentd_labeled_nodes(nodes_by_name, node_selector)
  111. if error:
  112. return error
  113. error_msgs = []
  114. error = self._check_node_labeling(nodes_by_name, fluentd_nodes, node_selector, task_vars)
  115. if error:
  116. error_msgs.append(error)
  117. error = self._check_nodes_have_fluentd(pods, fluentd_nodes)
  118. if error:
  119. error_msgs.append(error)
  120. error = self._check_fluentd_pods_running(pods)
  121. if error:
  122. error_msgs.append(error)
  123. # Make sure there are no extra fluentd pods
  124. if len(pods) > len(fluentd_nodes):
  125. error_msgs.append(
  126. 'There are more Fluentd pods running than nodes labeled.\n'
  127. 'This may not cause problems with logging but it likely indicates something wrong.'
  128. )
  129. return '\n'.join(error_msgs)
  130. def get_nodes_by_name(self, task_vars):
  131. """Retrieve all the node definitions. Returns: dict(name: node), error string"""
  132. nodes_json = self._exec_oc("get nodes -o json", [], task_vars)
  133. try:
  134. nodes = json.loads(nodes_json)
  135. except ValueError: # no valid json - should not happen
  136. return None, "Could not obtain a list of nodes to validate fluentd. Output from oc get:\n" + nodes_json
  137. if not nodes or not nodes.get('items'): # also should not happen
  138. return None, "No nodes appear to be defined according to the API."
  139. return {
  140. node['metadata']['name']: node
  141. for node in nodes['items']
  142. }, None
  143. def _exec_oc(self, cmd_str, extra_args, task_vars):
  144. return super(Fluentd, self).exec_oc(self.execute_module,
  145. self.logging_namespace,
  146. cmd_str,
  147. extra_args,
  148. task_vars)