elasticsearch.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. """Check for an aggregated logging Elasticsearch deployment"""
  2. import json
  3. import re
  4. from openshift_checks import get_var
  5. from openshift_checks.logging.logging import LoggingCheck
  6. class Elasticsearch(LoggingCheck):
  7. """Check for an aggregated logging Elasticsearch deployment"""
  8. name = "elasticsearch"
  9. tags = ["health", "logging"]
  10. logging_namespace = None
  11. def run(self, tmp, task_vars):
  12. """Check various things and gather errors. Returns: result as hash"""
  13. self.logging_namespace = get_var(task_vars, "openshift_logging_namespace", default="logging")
  14. es_pods, error = super(Elasticsearch, self).get_pods_for_component(
  15. self.execute_module,
  16. self.logging_namespace,
  17. "es",
  18. task_vars,
  19. )
  20. if error:
  21. return {"failed": True, "changed": False, "msg": error}
  22. check_error = self.check_elasticsearch(es_pods, task_vars)
  23. if check_error:
  24. msg = ("The following Elasticsearch deployment issue was found:"
  25. "\n-------\n"
  26. "{}".format(check_error))
  27. return {"failed": True, "changed": False, "msg": msg}
  28. # TODO(lmeyer): run it all again for the ops cluster
  29. return {"failed": False, "changed": False, "msg": 'No problems found with Elasticsearch deployment.'}
  30. def _not_running_elasticsearch_pods(self, es_pods):
  31. """Returns: list of pods that are not running, list of errors about non-running pods"""
  32. not_running = super(Elasticsearch, self).not_running_pods(es_pods)
  33. if not_running:
  34. return not_running, [(
  35. 'The following Elasticsearch pods are not running:\n'
  36. '{pods}'
  37. 'These pods will not aggregate logs from their nodes.'
  38. ).format(pods=''.join(
  39. " {} ({})\n".format(pod['metadata']['name'], pod['spec'].get('host', 'None'))
  40. for pod in not_running
  41. ))]
  42. return not_running, []
  43. def check_elasticsearch(self, es_pods, task_vars):
  44. """Various checks for elasticsearch. Returns: error string"""
  45. not_running_pods, error_msgs = self._not_running_elasticsearch_pods(es_pods)
  46. running_pods = [pod for pod in es_pods if pod not in not_running_pods]
  47. pods_by_name = {
  48. pod['metadata']['name']: pod for pod in running_pods
  49. # Filter out pods that are not members of a DC
  50. if pod['metadata'].get('labels', {}).get('deploymentconfig')
  51. }
  52. if not pods_by_name:
  53. return 'No logging Elasticsearch pods were found. Is logging deployed?'
  54. error_msgs += self._check_elasticsearch_masters(pods_by_name, task_vars)
  55. error_msgs += self._check_elasticsearch_node_list(pods_by_name, task_vars)
  56. error_msgs += self._check_es_cluster_health(pods_by_name, task_vars)
  57. error_msgs += self._check_elasticsearch_diskspace(pods_by_name, task_vars)
  58. return '\n'.join(error_msgs)
  59. @staticmethod
  60. def _build_es_curl_cmd(pod_name, url):
  61. base = "exec {name} -- curl -s --cert {base}cert --key {base}key --cacert {base}ca -XGET '{url}'"
  62. return base.format(base="/etc/elasticsearch/secret/admin-", name=pod_name, url=url)
  63. def _check_elasticsearch_masters(self, pods_by_name, task_vars):
  64. """Check that Elasticsearch masters are sane. Returns: list of error strings"""
  65. es_master_names = set()
  66. error_msgs = []
  67. for pod_name in pods_by_name.keys():
  68. # Compare what each ES node reports as master and compare for split brain
  69. get_master_cmd = self._build_es_curl_cmd(pod_name, "https://localhost:9200/_cat/master")
  70. master_name_str = self._exec_oc(get_master_cmd, [], task_vars)
  71. master_names = (master_name_str or '').split(' ')
  72. if len(master_names) > 1:
  73. es_master_names.add(master_names[1])
  74. else:
  75. error_msgs.append(
  76. 'No master? Elasticsearch {pod} returned bad string when asked master name:\n'
  77. ' {response}'.format(pod=pod_name, response=master_name_str)
  78. )
  79. if not es_master_names:
  80. error_msgs.append('No logging Elasticsearch masters were found. Is logging deployed?')
  81. return '\n'.join(error_msgs)
  82. if len(es_master_names) > 1:
  83. error_msgs.append(
  84. 'Found multiple Elasticsearch masters according to the pods:\n'
  85. '{master_list}\n'
  86. 'This implies that the masters have "split brain" and are not correctly\n'
  87. 'replicating data for the logging cluster. Log loss is likely to occur.'
  88. .format(master_list='\n'.join(' ' + master for master in es_master_names))
  89. )
  90. return error_msgs
  91. def _check_elasticsearch_node_list(self, pods_by_name, task_vars):
  92. """Check that reported ES masters are accounted for by pods. Returns: list of error strings"""
  93. if not pods_by_name:
  94. return ['No logging Elasticsearch masters were found. Is logging deployed?']
  95. # get ES cluster nodes
  96. node_cmd = self._build_es_curl_cmd(list(pods_by_name.keys())[0], 'https://localhost:9200/_nodes')
  97. cluster_node_data = self._exec_oc(node_cmd, [], task_vars)
  98. try:
  99. cluster_nodes = json.loads(cluster_node_data)['nodes']
  100. except (ValueError, KeyError):
  101. return [
  102. 'Failed to query Elasticsearch for the list of ES nodes. The output was:\n' +
  103. cluster_node_data
  104. ]
  105. # Try to match all ES-reported node hosts to known pods.
  106. error_msgs = []
  107. for node in cluster_nodes.values():
  108. # Note that with 1.4/3.4 the pod IP may be used as the master name
  109. if not any(node['host'] in (pod_name, pod['status'].get('podIP'))
  110. for pod_name, pod in pods_by_name.items()):
  111. error_msgs.append(
  112. 'The Elasticsearch cluster reports a member node "{node}"\n'
  113. 'that does not correspond to any known ES pod.'.format(node=node['host'])
  114. )
  115. return error_msgs
  116. def _check_es_cluster_health(self, pods_by_name, task_vars):
  117. """Exec into the elasticsearch pods and check the cluster health. Returns: list of errors"""
  118. error_msgs = []
  119. for pod_name in pods_by_name.keys():
  120. cluster_health_cmd = self._build_es_curl_cmd(pod_name, 'https://localhost:9200/_cluster/health?pretty=true')
  121. cluster_health_data = self._exec_oc(cluster_health_cmd, [], task_vars)
  122. try:
  123. health_res = json.loads(cluster_health_data)
  124. if not health_res or not health_res.get('status'):
  125. raise ValueError()
  126. except ValueError:
  127. error_msgs.append(
  128. 'Could not retrieve cluster health status from logging ES pod "{pod}".\n'
  129. 'Response was:\n{output}'.format(pod=pod_name, output=cluster_health_data)
  130. )
  131. continue
  132. if health_res['status'] not in ['green', 'yellow']:
  133. error_msgs.append(
  134. 'Elasticsearch cluster health status is RED according to pod "{}"'.format(pod_name)
  135. )
  136. return error_msgs
  137. def _check_elasticsearch_diskspace(self, pods_by_name, task_vars):
  138. """
  139. Exec into an ES pod and query the diskspace on the persistent volume.
  140. Returns: list of errors
  141. """
  142. error_msgs = []
  143. for pod_name in pods_by_name.keys():
  144. df_cmd = 'exec {} -- df --output=ipcent,pcent /elasticsearch/persistent'.format(pod_name)
  145. disk_output = self._exec_oc(df_cmd, [], task_vars)
  146. lines = disk_output.splitlines()
  147. # expecting one header looking like 'IUse% Use%' and one body line
  148. body_re = r'\s*(\d+)%?\s+(\d+)%?\s*$'
  149. if len(lines) != 2 or len(lines[0].split()) != 2 or not re.match(body_re, lines[1]):
  150. error_msgs.append(
  151. 'Could not retrieve storage usage from logging ES pod "{pod}".\n'
  152. 'Response to `df` command was:\n{output}'.format(pod=pod_name, output=disk_output)
  153. )
  154. continue
  155. inode_pct, disk_pct = re.match(body_re, lines[1]).groups()
  156. inode_pct_thresh = get_var(task_vars, 'openshift_check_efk_es_inode_pct', default='90')
  157. if int(inode_pct) >= int(inode_pct_thresh):
  158. error_msgs.append(
  159. 'Inode percent usage on the storage volume for logging ES pod "{pod}"\n'
  160. ' is {pct}, greater than threshold {limit}.\n'
  161. ' Note: threshold can be specified in inventory with {param}'.format(
  162. pod=pod_name,
  163. pct=str(inode_pct),
  164. limit=str(inode_pct_thresh),
  165. param='openshift_check_efk_es_inode_pct',
  166. ))
  167. disk_pct_thresh = get_var(task_vars, 'openshift_check_efk_es_storage_pct', default='80')
  168. if int(disk_pct) >= int(disk_pct_thresh):
  169. error_msgs.append(
  170. 'Disk percent usage on the storage volume for logging ES pod "{pod}"\n'
  171. ' is {pct}, greater than threshold {limit}.\n'
  172. ' Note: threshold can be specified in inventory with {param}'.format(
  173. pod=pod_name,
  174. pct=str(disk_pct),
  175. limit=str(disk_pct_thresh),
  176. param='openshift_check_efk_es_storage_pct',
  177. ))
  178. return error_msgs
  179. def _exec_oc(self, cmd_str, extra_args, task_vars):
  180. return super(Elasticsearch, self).exec_oc(
  181. self.execute_module,
  182. self.logging_namespace,
  183. cmd_str,
  184. extra_args,
  185. task_vars,
  186. )