elasticsearch.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. """Check for an aggregated logging Elasticsearch deployment"""
  2. import json
  3. import re
  4. from openshift_checks import OpenShiftCheckException, OpenShiftCheckExceptionList
  5. from openshift_checks.logging.logging import LoggingCheck
  6. class Elasticsearch(LoggingCheck):
  7. """Check for an aggregated logging Elasticsearch deployment"""
  8. name = "elasticsearch"
  9. tags = ["health", "logging"]
  10. def run(self):
  11. """Check various things and gather errors. Returns: result as hash"""
  12. es_pods = self.get_pods_for_component("es")
  13. self.check_elasticsearch(es_pods)
  14. # TODO(lmeyer): run it all again for the ops cluster
  15. return {}
  16. def check_elasticsearch(self, es_pods):
  17. """Perform checks for Elasticsearch. Raises OpenShiftCheckExceptionList on any errors."""
  18. running_pods, errors = self.running_elasticsearch_pods(es_pods)
  19. pods_by_name = {
  20. pod['metadata']['name']: pod for pod in running_pods
  21. # Filter out pods that are not members of a DC
  22. if pod['metadata'].get('labels', {}).get('deploymentconfig')
  23. }
  24. if not pods_by_name:
  25. # nothing running, cannot run the rest of the check
  26. errors.append(OpenShiftCheckException(
  27. 'NoRunningPods',
  28. 'No logging Elasticsearch pods were found running, so no logs are being aggregated.'
  29. ))
  30. raise OpenShiftCheckExceptionList(errors)
  31. errors += self.check_elasticsearch_masters(pods_by_name)
  32. errors += self.check_elasticsearch_node_list(pods_by_name)
  33. errors += self.check_es_cluster_health(pods_by_name)
  34. errors += self.check_elasticsearch_diskspace(pods_by_name)
  35. if errors:
  36. raise OpenShiftCheckExceptionList(errors)
  37. def running_elasticsearch_pods(self, es_pods):
  38. """Returns: list of running pods, list of errors about non-running pods"""
  39. not_running = self.not_running_pods(es_pods)
  40. running_pods = [pod for pod in es_pods if pod not in not_running]
  41. if not_running:
  42. return running_pods, [OpenShiftCheckException(
  43. 'PodNotRunning',
  44. 'The following Elasticsearch pods are defined but not running:\n'
  45. '{pods}'.format(pods=''.join(
  46. " {} ({})\n".format(pod['metadata']['name'], pod['spec'].get('host', 'None'))
  47. for pod in not_running
  48. ))
  49. )]
  50. return running_pods, []
  51. @staticmethod
  52. def _build_es_curl_cmd(pod_name, url):
  53. base = "exec {name} -- curl -s --cert {base}cert --key {base}key --cacert {base}ca -XGET '{url}'"
  54. return base.format(base="/etc/elasticsearch/secret/admin-", name=pod_name, url=url)
  55. def check_elasticsearch_masters(self, pods_by_name):
  56. """Check that Elasticsearch masters are sane. Returns: list of errors"""
  57. es_master_names = set()
  58. errors = []
  59. for pod_name in pods_by_name.keys():
  60. # Compare what each ES node reports as master and compare for split brain
  61. get_master_cmd = self._build_es_curl_cmd(pod_name, "https://localhost:9200/_cat/master")
  62. master_name_str = self.exec_oc(get_master_cmd, [], save_as_name="get_master_names.json")
  63. master_names = (master_name_str or '').split(' ')
  64. if len(master_names) > 1:
  65. es_master_names.add(master_names[1])
  66. else:
  67. errors.append(OpenShiftCheckException(
  68. 'NoMasterName',
  69. 'Elasticsearch {pod} gave unexpected response when asked master name:\n'
  70. ' {response}'.format(pod=pod_name, response=master_name_str)
  71. ))
  72. if not es_master_names:
  73. errors.append(OpenShiftCheckException(
  74. 'NoMasterFound',
  75. 'No logging Elasticsearch masters were found.'
  76. ))
  77. return errors
  78. if len(es_master_names) > 1:
  79. errors.append(OpenShiftCheckException(
  80. 'SplitBrainMasters',
  81. 'Found multiple Elasticsearch masters according to the pods:\n'
  82. '{master_list}\n'
  83. 'This implies that the masters have "split brain" and are not correctly\n'
  84. 'replicating data for the logging cluster. Log loss is likely to occur.'
  85. .format(master_list='\n'.join(' ' + master for master in es_master_names))
  86. ))
  87. return errors
  88. def check_elasticsearch_node_list(self, pods_by_name):
  89. """Check that reported ES masters are accounted for by pods. Returns: list of errors"""
  90. if not pods_by_name:
  91. return [OpenShiftCheckException(
  92. 'MissingComponentPods',
  93. 'No logging Elasticsearch pods were found.'
  94. )]
  95. # get ES cluster nodes
  96. node_cmd = self._build_es_curl_cmd(list(pods_by_name.keys())[0], 'https://localhost:9200/_nodes')
  97. cluster_node_data = self.exec_oc(node_cmd, [], save_as_name="get_es_nodes.json")
  98. try:
  99. cluster_nodes = json.loads(cluster_node_data)['nodes']
  100. except (ValueError, KeyError):
  101. return [OpenShiftCheckException(
  102. 'MissingNodeList',
  103. 'Failed to query Elasticsearch for the list of ES nodes. The output was:\n' +
  104. cluster_node_data
  105. )]
  106. # Try to match all ES-reported node hosts to known pods.
  107. errors = []
  108. for node in cluster_nodes.values():
  109. # Note that with 1.4/3.4 the pod IP may be used as the master name
  110. if not any(node['host'] in (pod_name, pod['status'].get('podIP'))
  111. for pod_name, pod in pods_by_name.items()):
  112. errors.append(OpenShiftCheckException(
  113. 'EsPodNodeMismatch',
  114. 'The Elasticsearch cluster reports a member node "{node}"\n'
  115. 'that does not correspond to any known ES pod.'.format(node=node['host'])
  116. ))
  117. return errors
  118. def check_es_cluster_health(self, pods_by_name):
  119. """Exec into the elasticsearch pods and check the cluster health. Returns: list of errors"""
  120. errors = []
  121. for pod_name in pods_by_name.keys():
  122. cluster_health_cmd = self._build_es_curl_cmd(pod_name, 'https://localhost:9200/_cluster/health?pretty=true')
  123. cluster_health_data = self.exec_oc(cluster_health_cmd, [], save_as_name='get_es_health.json')
  124. try:
  125. health_res = json.loads(cluster_health_data)
  126. if not health_res or not health_res.get('status'):
  127. raise ValueError()
  128. except ValueError:
  129. errors.append(OpenShiftCheckException(
  130. 'BadEsResponse',
  131. 'Could not retrieve cluster health status from logging ES pod "{pod}".\n'
  132. 'Response was:\n{output}'.format(pod=pod_name, output=cluster_health_data)
  133. ))
  134. continue
  135. if health_res['status'] not in ['green', 'yellow']:
  136. errors.append(OpenShiftCheckException(
  137. 'EsClusterHealthRed',
  138. 'Elasticsearch cluster health status is RED according to pod "{}"'.format(pod_name)
  139. ))
  140. return errors
  141. def check_elasticsearch_diskspace(self, pods_by_name):
  142. """
  143. Exec into an ES pod and query the diskspace on the persistent volume.
  144. Returns: list of errors
  145. """
  146. errors = []
  147. for pod_name in pods_by_name.keys():
  148. df_cmd = 'exec {} -- df --output=ipcent,pcent /elasticsearch/persistent'.format(pod_name)
  149. disk_output = self.exec_oc(df_cmd, [], save_as_name='get_pv_diskspace.json')
  150. lines = disk_output.splitlines()
  151. # expecting one header looking like 'IUse% Use%' and one body line
  152. body_re = r'\s*(\d+)%?\s+(\d+)%?\s*$'
  153. if len(lines) != 2 or len(lines[0].split()) != 2 or not re.match(body_re, lines[1]):
  154. errors.append(OpenShiftCheckException(
  155. 'BadDfResponse',
  156. 'Could not retrieve storage usage from logging ES pod "{pod}".\n'
  157. 'Response to `df` command was:\n{output}'.format(pod=pod_name, output=disk_output)
  158. ))
  159. continue
  160. inode_pct, disk_pct = re.match(body_re, lines[1]).groups()
  161. inode_pct_thresh = self.get_var('openshift_check_efk_es_inode_pct', default='90')
  162. if int(inode_pct) >= int(inode_pct_thresh):
  163. errors.append(OpenShiftCheckException(
  164. 'InodeUsageTooHigh',
  165. 'Inode percent usage on the storage volume for logging ES pod "{pod}"\n'
  166. ' is {pct}, greater than threshold {limit}.\n'
  167. ' Note: threshold can be specified in inventory with {param}'.format(
  168. pod=pod_name,
  169. pct=str(inode_pct),
  170. limit=str(inode_pct_thresh),
  171. param='openshift_check_efk_es_inode_pct',
  172. )))
  173. disk_pct_thresh = self.get_var('openshift_check_efk_es_storage_pct', default='80')
  174. if int(disk_pct) >= int(disk_pct_thresh):
  175. errors.append(OpenShiftCheckException(
  176. 'DiskUsageTooHigh',
  177. 'Disk percent usage on the storage volume for logging ES pod "{pod}"\n'
  178. ' is {pct}, greater than threshold {limit}.\n'
  179. ' Note: threshold can be specified in inventory with {param}'.format(
  180. pod=pod_name,
  181. pct=str(disk_pct),
  182. limit=str(disk_pct_thresh),
  183. param='openshift_check_efk_es_storage_pct',
  184. )))
  185. return errors