fluentd.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. """Check for an aggregated logging Fluentd deployment"""
  2. import json
  3. from openshift_checks import OpenShiftCheckException, OpenShiftCheckExceptionList
  4. from openshift_checks.logging.logging import LoggingCheck
  5. class Fluentd(LoggingCheck):
  6. """Check for an aggregated logging Fluentd deployment"""
  7. name = "fluentd"
  8. tags = ["health", "logging"]
  9. def run(self):
  10. """Check the Fluentd deployment and raise an error if any problems are found."""
  11. fluentd_pods = self.get_pods_for_component("fluentd")
  12. self.check_fluentd(fluentd_pods)
  13. return {}
  14. def check_fluentd(self, pods):
  15. """Verify fluentd is running everywhere. Raises OpenShiftCheckExceptionList if error(s) found."""
  16. node_selector = self.get_var(
  17. 'openshift_logging_fluentd_nodeselector',
  18. default='logging-infra-fluentd=true'
  19. )
  20. nodes_by_name = self.get_nodes_by_name()
  21. fluentd_nodes = self.filter_fluentd_labeled_nodes(nodes_by_name, node_selector)
  22. errors = []
  23. errors += self.check_node_labeling(nodes_by_name, fluentd_nodes, node_selector)
  24. errors += self.check_nodes_have_fluentd(pods, fluentd_nodes)
  25. errors += self.check_fluentd_pods_running(pods)
  26. # Make sure there are no extra fluentd pods
  27. if len(pods) > len(fluentd_nodes):
  28. errors.append(OpenShiftCheckException(
  29. 'TooManyFluentdPods',
  30. 'There are more Fluentd pods running than nodes labeled.\n'
  31. 'This may not cause problems with logging but it likely indicates something wrong.'
  32. ))
  33. if errors:
  34. raise OpenShiftCheckExceptionList(errors)
  35. def get_nodes_by_name(self):
  36. """Retrieve all the node definitions. Returns: dict(name: node)"""
  37. nodes_json = self.exec_oc("get nodes -o json", [])
  38. try:
  39. nodes = json.loads(nodes_json)
  40. except ValueError: # no valid json - should not happen
  41. raise OpenShiftCheckException(
  42. "BadOcNodeList",
  43. "Could not obtain a list of nodes to validate fluentd.\n"
  44. "Output from oc get:\n" + nodes_json
  45. )
  46. if not nodes or not nodes.get('items'): # also should not happen
  47. raise OpenShiftCheckException(
  48. "NoNodesDefined",
  49. "No nodes appear to be defined according to the API."
  50. )
  51. return {
  52. node['metadata']['name']: node
  53. for node in nodes['items']
  54. }
  55. @staticmethod
  56. def filter_fluentd_labeled_nodes(nodes_by_name, node_selector):
  57. """Filter to all nodes with fluentd label. Returns dict(name: node)"""
  58. label, value = node_selector.split('=', 1)
  59. fluentd_nodes = {
  60. name: node for name, node in nodes_by_name.items()
  61. if node['metadata']['labels'].get(label) == value
  62. }
  63. if not fluentd_nodes:
  64. raise OpenShiftCheckException(
  65. 'NoNodesLabeled',
  66. 'There are no nodes with the fluentd label {label}.\n'
  67. 'This means no logs will be aggregated from the nodes.'.format(label=node_selector)
  68. )
  69. return fluentd_nodes
  70. def check_node_labeling(self, nodes_by_name, fluentd_nodes, node_selector):
  71. """Note if nodes are not labeled as expected. Returns: error list"""
  72. intended_nodes = self.get_var('openshift_logging_fluentd_hosts', default=['--all'])
  73. if not intended_nodes or '--all' in intended_nodes:
  74. intended_nodes = nodes_by_name.keys()
  75. nodes_missing_labels = set(intended_nodes) - set(fluentd_nodes.keys())
  76. if nodes_missing_labels:
  77. return [OpenShiftCheckException(
  78. 'NodesUnlabeled',
  79. 'The following nodes are supposed to be labeled with {label} but are not:\n'
  80. ' {nodes}\n'
  81. 'Fluentd will not aggregate logs from these nodes.'.format(
  82. label=node_selector, nodes=', '.join(nodes_missing_labels)
  83. ))]
  84. return []
  85. @staticmethod
  86. def check_nodes_have_fluentd(pods, fluentd_nodes):
  87. """Make sure fluentd is on all the labeled nodes. Returns: error list"""
  88. unmatched_nodes = fluentd_nodes.copy()
  89. node_names_by_label = {
  90. node['metadata']['labels']['kubernetes.io/hostname']: name
  91. for name, node in fluentd_nodes.items()
  92. }
  93. node_names_by_internal_ip = {
  94. address['address']: name
  95. for name, node in fluentd_nodes.items()
  96. for address in node['status']['addresses']
  97. if address['type'] == "InternalIP"
  98. }
  99. for pod in pods:
  100. for name in [
  101. pod['spec']['nodeName'],
  102. node_names_by_internal_ip.get(pod['spec']['nodeName']),
  103. node_names_by_label.get(pod.get('spec', {}).get('host')),
  104. ]:
  105. unmatched_nodes.pop(name, None)
  106. if unmatched_nodes:
  107. return [OpenShiftCheckException(
  108. 'MissingFluentdPod',
  109. 'The following nodes are supposed to have a Fluentd pod but do not:\n'
  110. ' {nodes}\n'
  111. 'These nodes will not have their logs aggregated.'.format(
  112. nodes='\n '.join(unmatched_nodes.keys())
  113. ))]
  114. return []
  115. def check_fluentd_pods_running(self, pods):
  116. """Make sure all fluentd pods are running. Returns: error string"""
  117. not_running = super(Fluentd, self).not_running_pods(pods)
  118. if not_running:
  119. return [OpenShiftCheckException(
  120. 'FluentdNotRunning',
  121. 'The following Fluentd pods are supposed to be running but are not:\n'
  122. ' {pods}\n'
  123. 'These pods will not aggregate logs from their nodes.'.format(
  124. pods='\n'.join(
  125. " {name} ({host})".format(
  126. name=pod['metadata']['name'],
  127. host=pod['spec'].get('host', 'None')
  128. )
  129. for pod in not_running
  130. )
  131. ))]
  132. return []