elasticsearch.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. """
  2. Module for performing checks on an Elasticsearch logging deployment
  3. """
  4. import json
  5. import re
  6. from openshift_checks import get_var
  7. from openshift_checks.logging.logging import LoggingCheck
  8. class Elasticsearch(LoggingCheck):
  9. """Module that checks an integrated logging Elasticsearch deployment"""
  10. name = "elasticsearch"
  11. tags = ["health", "logging"]
  12. logging_namespace = None
  13. def run(self, tmp, task_vars):
  14. """Check various things and gather errors. Returns: result as hash"""
  15. self.logging_namespace = get_var(task_vars, "openshift_logging_namespace", default="logging")
  16. es_pods, error = super(Elasticsearch, self).get_pods_for_component(
  17. self.execute_module,
  18. self.logging_namespace,
  19. "es",
  20. task_vars,
  21. )
  22. if error:
  23. return {"failed": True, "changed": False, "msg": error}
  24. check_error = self.check_elasticsearch(es_pods, task_vars)
  25. if check_error:
  26. msg = ("The following Elasticsearch deployment issue was found:"
  27. "\n-------\n"
  28. "{}".format(check_error))
  29. return {"failed": True, "changed": False, "msg": msg}
  30. # TODO(lmeyer): run it all again for the ops cluster
  31. return {"failed": False, "changed": False, "msg": 'No problems found with Elasticsearch deployment.'}
  32. def _not_running_elasticsearch_pods(self, es_pods):
  33. """Returns: list of running pods, list of errors about non-running pods"""
  34. not_running = super(Elasticsearch, self).not_running_pods(es_pods)
  35. if not_running:
  36. return not_running, [(
  37. 'The following Elasticsearch pods are not running:\n'
  38. '{pods}'
  39. 'These pods will not aggregate logs from their nodes.'
  40. ).format(pods=''.join(
  41. " {} ({})\n".format(pod['metadata']['name'], pod['spec'].get('host', 'None'))
  42. for pod in not_running
  43. ))]
  44. return not_running, []
  45. def check_elasticsearch(self, es_pods, task_vars):
  46. """Various checks for elasticsearch. Returns: error string"""
  47. not_running_pods, error_msgs = self._not_running_elasticsearch_pods(es_pods)
  48. running_pods = [pod for pod in es_pods if pod not in not_running_pods]
  49. pods_by_name = {
  50. pod['metadata']['name']: pod for pod in running_pods
  51. # Filter out pods that are not members of a DC
  52. if pod['metadata'].get('labels', {}).get('deploymentconfig')
  53. }
  54. if not pods_by_name:
  55. return 'No logging Elasticsearch pods were found. Is logging deployed?'
  56. error_msgs += self._check_elasticsearch_masters(pods_by_name, task_vars)
  57. error_msgs += self._check_elasticsearch_node_list(pods_by_name, task_vars)
  58. error_msgs += self._check_es_cluster_health(pods_by_name, task_vars)
  59. error_msgs += self._check_elasticsearch_diskspace(pods_by_name, task_vars)
  60. return '\n'.join(error_msgs)
  61. @staticmethod
  62. def _build_es_curl_cmd(pod_name, url):
  63. base = "exec {name} -- curl -s --cert {base}cert --key {base}key --cacert {base}ca -XGET '{url}'"
  64. return base.format(base="/etc/elasticsearch/secret/admin-", name=pod_name, url=url)
  65. def _check_elasticsearch_masters(self, pods_by_name, task_vars):
  66. """Check that Elasticsearch masters are sane. Returns: list of error strings"""
  67. es_master_names = set()
  68. error_msgs = []
  69. for pod_name in pods_by_name.keys():
  70. # Compare what each ES node reports as master and compare for split brain
  71. get_master_cmd = self._build_es_curl_cmd(pod_name, "https://localhost:9200/_cat/master")
  72. master_name_str = self._exec_oc(get_master_cmd, [], task_vars)
  73. master_names = (master_name_str or '').split(' ')
  74. if len(master_names) > 1:
  75. es_master_names.add(master_names[1])
  76. else:
  77. error_msgs.append(
  78. 'No master? Elasticsearch {pod} returned bad string when asked master name:\n'
  79. ' {response}'.format(pod=pod_name, response=master_name_str)
  80. )
  81. if not es_master_names:
  82. error_msgs.append('No logging Elasticsearch masters were found. Is logging deployed?')
  83. return '\n'.join(error_msgs)
  84. if len(es_master_names) > 1:
  85. error_msgs.append(
  86. 'Found multiple Elasticsearch masters according to the pods:\n'
  87. '{master_list}\n'
  88. 'This implies that the masters have "split brain" and are not correctly\n'
  89. 'replicating data for the logging cluster. Log loss is likely to occur.'
  90. .format(master_list='\n'.join(' ' + master for master in es_master_names))
  91. )
  92. return error_msgs
  93. def _check_elasticsearch_node_list(self, pods_by_name, task_vars):
  94. """Check that reported ES masters are accounted for by pods. Returns: list of error strings"""
  95. if not pods_by_name:
  96. return ['No logging Elasticsearch masters were found. Is logging deployed?']
  97. # get ES cluster nodes
  98. node_cmd = self._build_es_curl_cmd(list(pods_by_name.keys())[0], 'https://localhost:9200/_nodes')
  99. cluster_node_data = self._exec_oc(node_cmd, [], task_vars)
  100. try:
  101. cluster_nodes = json.loads(cluster_node_data)['nodes']
  102. except (ValueError, KeyError):
  103. return [
  104. 'Failed to query Elasticsearch for the list of ES nodes. The output was:\n' +
  105. cluster_node_data
  106. ]
  107. # Try to match all ES-reported node hosts to known pods.
  108. error_msgs = []
  109. for node in cluster_nodes.values():
  110. # Note that with 1.4/3.4 the pod IP may be used as the master name
  111. if not any(node['host'] in (pod_name, pod['status'].get('podIP'))
  112. for pod_name, pod in pods_by_name.items()):
  113. error_msgs.append(
  114. 'The Elasticsearch cluster reports a member node "{node}"\n'
  115. 'that does not correspond to any known ES pod.'.format(node=node['host'])
  116. )
  117. return error_msgs
  118. def _check_es_cluster_health(self, pods_by_name, task_vars):
  119. """Exec into the elasticsearch pods and check the cluster health. Returns: list of errors"""
  120. error_msgs = []
  121. for pod_name in pods_by_name.keys():
  122. cluster_health_cmd = self._build_es_curl_cmd(pod_name, 'https://localhost:9200/_cluster/health?pretty=true')
  123. cluster_health_data = self._exec_oc(cluster_health_cmd, [], task_vars)
  124. try:
  125. health_res = json.loads(cluster_health_data)
  126. if not health_res or not health_res.get('status'):
  127. raise ValueError()
  128. except ValueError:
  129. error_msgs.append(
  130. 'Could not retrieve cluster health status from logging ES pod "{pod}".\n'
  131. 'Response was:\n{output}'.format(pod=pod_name, output=cluster_health_data)
  132. )
  133. continue
  134. if health_res['status'] not in ['green', 'yellow']:
  135. error_msgs.append(
  136. 'Elasticsearch cluster health status is RED according to pod "{}"'.format(pod_name)
  137. )
  138. return error_msgs
  139. def _check_elasticsearch_diskspace(self, pods_by_name, task_vars):
  140. """
  141. Exec into an ES pod and query the diskspace on the persistent volume.
  142. Returns: list of errors
  143. """
  144. error_msgs = []
  145. for pod_name in pods_by_name.keys():
  146. df_cmd = 'exec {} -- df --output=ipcent,pcent /elasticsearch/persistent'.format(pod_name)
  147. disk_output = self._exec_oc(df_cmd, [], task_vars)
  148. lines = disk_output.splitlines()
  149. # expecting one header looking like 'IUse% Use%' and one body line
  150. body_re = r'\s*(\d+)%?\s+(\d+)%?\s*$'
  151. if len(lines) != 2 or len(lines[0].split()) != 2 or not re.match(body_re, lines[1]):
  152. error_msgs.append(
  153. 'Could not retrieve storage usage from logging ES pod "{pod}".\n'
  154. 'Response to `df` command was:\n{output}'.format(pod=pod_name, output=disk_output)
  155. )
  156. continue
  157. inode_pct, disk_pct = re.match(body_re, lines[1]).groups()
  158. inode_pct_thresh = get_var(task_vars, 'openshift_check_efk_es_inode_pct', default='90')
  159. if int(inode_pct) >= int(inode_pct_thresh):
  160. error_msgs.append(
  161. 'Inode percent usage on the storage volume for logging ES pod "{pod}"\n'
  162. ' is {pct}, greater than threshold {limit}.\n'
  163. ' Note: threshold can be specified in inventory with {param}'.format(
  164. pod=pod_name,
  165. pct=str(inode_pct),
  166. limit=str(inode_pct_thresh),
  167. param='openshift_check_efk_es_inode_pct',
  168. ))
  169. disk_pct_thresh = get_var(task_vars, 'openshift_check_efk_es_storage_pct', default='80')
  170. if int(disk_pct) >= int(disk_pct_thresh):
  171. error_msgs.append(
  172. 'Disk percent usage on the storage volume for logging ES pod "{pod}"\n'
  173. ' is {pct}, greater than threshold {limit}.\n'
  174. ' Note: threshold can be specified in inventory with {param}'.format(
  175. pod=pod_name,
  176. pct=str(disk_pct),
  177. limit=str(disk_pct_thresh),
  178. param='openshift_check_efk_es_storage_pct',
  179. ))
  180. return error_msgs
  181. def _exec_oc(self, cmd_str, extra_args, task_vars):
  182. return super(Elasticsearch, self).exec_oc(
  183. self.execute_module,
  184. self.logging_namespace,
  185. cmd_str,
  186. extra_args,
  187. task_vars,
  188. )