logging_index_time.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. """
  2. Check for ensuring logs from pods can be queried in a reasonable amount of time.
  3. """
  4. import json
  5. import time
  6. from uuid import uuid4
  7. from openshift_checks import OpenShiftCheckException
  8. from openshift_checks.logging.logging import LoggingCheck
  9. ES_CMD_TIMEOUT_SECONDS = 30
  10. class LoggingIndexTime(LoggingCheck):
  11. """Check that pod logs are aggregated and indexed in ElasticSearch within a reasonable amount of time."""
  12. name = "logging_index_time"
  13. tags = ["health", "logging"]
  14. logging_namespace = "logging"
  15. def run(self):
  16. """Add log entry by making unique request to Kibana. Check for unique entry in the ElasticSearch pod logs."""
  17. try:
  18. log_index_timeout = int(
  19. self.get_var("openshift_check_logging_index_timeout_seconds", default=ES_CMD_TIMEOUT_SECONDS)
  20. )
  21. except ValueError:
  22. return {
  23. "failed": True,
  24. "msg": ('Invalid value provided for "openshift_check_logging_index_timeout_seconds". '
  25. 'Value must be an integer representing an amount in seconds.'),
  26. }
  27. running_component_pods = dict()
  28. # get all component pods
  29. self.logging_namespace = self.get_var("openshift_logging_namespace", default=self.logging_namespace)
  30. for component, name in (['kibana', 'Kibana'], ['es', 'Elasticsearch']):
  31. pods, error = self.get_pods_for_component(self.logging_namespace, component)
  32. if error:
  33. msg = 'Unable to retrieve pods for the {} logging component: {}'
  34. return {"failed": True, "changed": False, "msg": msg.format(name, error)}
  35. running_pods = self.running_pods(pods)
  36. if not running_pods:
  37. msg = ('No {} pods in the "Running" state were found.'
  38. 'At least one pod is required in order to perform this check.')
  39. return {"failed": True, "changed": False, "msg": msg.format(name)}
  40. running_component_pods[component] = running_pods
  41. uuid = self.curl_kibana_with_uuid(running_component_pods["kibana"][0])
  42. self.wait_until_cmd_or_err(running_component_pods["es"][0], uuid, log_index_timeout)
  43. return {}
  44. def wait_until_cmd_or_err(self, es_pod, uuid, timeout_secs):
  45. """Retry an Elasticsearch query every second until query success, or a defined
  46. length of time has passed."""
  47. deadline = time.time() + timeout_secs
  48. interval = 1
  49. while not self.query_es_from_es(es_pod, uuid):
  50. if time.time() + interval > deadline:
  51. msg = "expecting match in Elasticsearch for message with uuid {}, but no matches were found after {}s."
  52. raise OpenShiftCheckException(msg.format(uuid, timeout_secs))
  53. time.sleep(interval)
  54. def curl_kibana_with_uuid(self, kibana_pod):
  55. """curl Kibana with a unique uuid."""
  56. uuid = self.generate_uuid()
  57. pod_name = kibana_pod["metadata"]["name"]
  58. exec_cmd = "exec {pod_name} -c kibana -- curl --max-time 30 -s http://localhost:5601/{uuid}"
  59. exec_cmd = exec_cmd.format(pod_name=pod_name, uuid=uuid)
  60. error_str = self.exec_oc(self.logging_namespace, exec_cmd, [])
  61. try:
  62. error_code = json.loads(error_str)["statusCode"]
  63. except KeyError:
  64. msg = ('invalid response returned from Kibana request (Missing "statusCode" key):\n'
  65. 'Command: {}\nResponse: {}').format(exec_cmd, error_str)
  66. raise OpenShiftCheckException(msg)
  67. except ValueError:
  68. msg = ('invalid response returned from Kibana request (Non-JSON output):\n'
  69. 'Command: {}\nResponse: {}').format(exec_cmd, error_str)
  70. raise OpenShiftCheckException(msg)
  71. if error_code != 404:
  72. msg = 'invalid error code returned from Kibana request. Expecting error code "404", but got "{}" instead.'
  73. raise OpenShiftCheckException(msg.format(error_code))
  74. return uuid
  75. def query_es_from_es(self, es_pod, uuid):
  76. """curl the Elasticsearch pod and look for a unique uuid in its logs."""
  77. pod_name = es_pod["metadata"]["name"]
  78. exec_cmd = (
  79. "exec {pod_name} -- curl --max-time 30 -s -f "
  80. "--cacert /etc/elasticsearch/secret/admin-ca "
  81. "--cert /etc/elasticsearch/secret/admin-cert "
  82. "--key /etc/elasticsearch/secret/admin-key "
  83. "https://logging-es:9200/project.{namespace}*/_count?q=message:{uuid}"
  84. )
  85. exec_cmd = exec_cmd.format(pod_name=pod_name, namespace=self.logging_namespace, uuid=uuid)
  86. result = self.exec_oc(self.logging_namespace, exec_cmd, [])
  87. try:
  88. count = json.loads(result)["count"]
  89. except KeyError:
  90. msg = 'invalid response from Elasticsearch query:\n"{}"\nMissing "count" key:\n{}'
  91. raise OpenShiftCheckException(msg.format(exec_cmd, result))
  92. except ValueError:
  93. msg = 'invalid response from Elasticsearch query:\n"{}"\nNon-JSON output:\n{}'
  94. raise OpenShiftCheckException(msg.format(exec_cmd, result))
  95. return count
  96. @staticmethod
  97. def running_pods(pods):
  98. """Filter pods that are running."""
  99. return [pod for pod in pods if pod['status']['phase'] == 'Running']
  100. @staticmethod
  101. def generate_uuid():
  102. """Wrap uuid generator. Allows for testing with expected values."""
  103. return str(uuid4())