Browse Source

Merge pull request #4682 from juanvallejo/jvallejo/verify-logging-index-time

verify sane log times in logging stack
Rodolfo Carvalho 7 years ago
parent
commit
c5e782db1b

+ 6 - 5
roles/openshift_health_checker/openshift_checks/logging/logging.py

@@ -12,20 +12,21 @@ class LoggingCheck(OpenShiftCheck):
     """Base class for logging component checks"""
 
     name = "logging"
+    logging_namespace = "logging"
 
     @classmethod
     def is_active(cls, task_vars):
-        return super(LoggingCheck, cls).is_active(task_vars) and cls.is_first_master(task_vars)
+        logging_deployed = get_var(task_vars, "openshift_hosted_logging_deploy", default=False)
+        return super(LoggingCheck, cls).is_active(task_vars) and cls.is_first_master(task_vars) and logging_deployed
 
     @staticmethod
     def is_first_master(task_vars):
-        """Run only on first master and only when logging is configured. Returns: bool"""
-        logging_deployed = get_var(task_vars, "openshift_hosted_logging_deploy", default=True)
+        """Run only on first master. Returns: bool"""
         # Note: It would be nice to use membership in oo_first_master group, however for now it
         # seems best to avoid requiring that setup and just check this is the first master.
         hostname = get_var(task_vars, "ansible_ssh_host") or [None]
         masters = get_var(task_vars, "groups", "masters", default=None) or [None]
-        return logging_deployed and masters[0] == hostname
+        return masters and masters[0] == hostname
 
     def run(self, tmp, task_vars):
         pass
@@ -45,7 +46,7 @@ class LoggingCheck(OpenShiftCheck):
                 raise ValueError()
         except ValueError:
             # successful run but non-parsing data generally means there were no pods in the namespace
-            return None, 'There are no pods in the {} namespace. Is logging deployed?'.format(namespace)
+            return None, 'No pods were found for the "{}" logging component.'.format(logging_component)
 
         return pods['items'], None
 

+ 132 - 0
roles/openshift_health_checker/openshift_checks/logging/logging_index_time.py

@@ -0,0 +1,132 @@
+"""
+Check for ensuring logs from pods can be queried in a reasonable amount of time.
+"""
+
+import json
+import time
+
+from uuid import uuid4
+
+from openshift_checks import get_var, OpenShiftCheckException
+from openshift_checks.logging.logging import LoggingCheck
+
+
+ES_CMD_TIMEOUT_SECONDS = 30
+
+
+class LoggingIndexTime(LoggingCheck):
+    """Check that pod logs are aggregated and indexed in ElasticSearch within a reasonable amount of time."""
+    name = "logging_index_time"
+    tags = ["health", "logging"]
+
+    logging_namespace = "logging"
+
+    def run(self, tmp, task_vars):
+        """Add log entry by making unique request to Kibana. Check for unique entry in the ElasticSearch pod logs."""
+        try:
+            log_index_timeout = int(
+                get_var(task_vars, "openshift_check_logging_index_timeout_seconds", default=ES_CMD_TIMEOUT_SECONDS)
+            )
+        except ValueError:
+            return {
+                "failed": True,
+                "msg": ('Invalid value provided for "openshift_check_logging_index_timeout_seconds". '
+                        'Value must be an integer representing an amount in seconds.'),
+            }
+
+        running_component_pods = dict()
+
+        # get all component pods
+        self.logging_namespace = get_var(task_vars, "openshift_logging_namespace", default=self.logging_namespace)
+        for component, name in (['kibana', 'Kibana'], ['es', 'Elasticsearch']):
+            pods, error = self.get_pods_for_component(
+                self.execute_module, self.logging_namespace, component, task_vars,
+            )
+
+            if error:
+                msg = 'Unable to retrieve pods for the {} logging component: {}'
+                return {"failed": True, "changed": False, "msg": msg.format(name, error)}
+
+            running_pods = self.running_pods(pods)
+
+            if not running_pods:
+                msg = ('No {} pods in the "Running" state were found.'
+                       'At least one pod is required in order to perform this check.')
+                return {"failed": True, "changed": False, "msg": msg.format(name)}
+
+            running_component_pods[component] = running_pods
+
+        uuid = self.curl_kibana_with_uuid(running_component_pods["kibana"][0], task_vars)
+        self.wait_until_cmd_or_err(running_component_pods["es"][0], uuid, log_index_timeout, task_vars)
+        return {}
+
+    def wait_until_cmd_or_err(self, es_pod, uuid, timeout_secs, task_vars):
+        """Retry an Elasticsearch query every second until query success, or a defined
+        length of time has passed."""
+        deadline = time.time() + timeout_secs
+        interval = 1
+        while not self.query_es_from_es(es_pod, uuid, task_vars):
+            if time.time() + interval > deadline:
+                msg = "expecting match in Elasticsearch for message with uuid {}, but no matches were found after {}s."
+                raise OpenShiftCheckException(msg.format(uuid, timeout_secs))
+            time.sleep(interval)
+
+    def curl_kibana_with_uuid(self, kibana_pod, task_vars):
+        """curl Kibana with a unique uuid."""
+        uuid = self.generate_uuid()
+        pod_name = kibana_pod["metadata"]["name"]
+        exec_cmd = "exec {pod_name} -c kibana -- curl --max-time 30 -s http://localhost:5601/{uuid}"
+        exec_cmd = exec_cmd.format(pod_name=pod_name, uuid=uuid)
+
+        error_str = self.exec_oc(self.execute_module, self.logging_namespace, exec_cmd, [], task_vars)
+
+        try:
+            error_code = json.loads(error_str)["statusCode"]
+        except KeyError:
+            msg = ('invalid response returned from Kibana request (Missing "statusCode" key):\n'
+                   'Command: {}\nResponse: {}').format(exec_cmd, error_str)
+            raise OpenShiftCheckException(msg)
+        except ValueError:
+            msg = ('invalid response returned from Kibana request (Non-JSON output):\n'
+                   'Command: {}\nResponse: {}').format(exec_cmd, error_str)
+            raise OpenShiftCheckException(msg)
+
+        if error_code != 404:
+            msg = 'invalid error code returned from Kibana request. Expecting error code "404", but got "{}" instead.'
+            raise OpenShiftCheckException(msg.format(error_code))
+
+        return uuid
+
+    def query_es_from_es(self, es_pod, uuid, task_vars):
+        """curl the Elasticsearch pod and look for a unique uuid in its logs."""
+        pod_name = es_pod["metadata"]["name"]
+        exec_cmd = (
+            "exec {pod_name} -- curl --max-time 30 -s -f "
+            "--cacert /etc/elasticsearch/secret/admin-ca "
+            "--cert /etc/elasticsearch/secret/admin-cert "
+            "--key /etc/elasticsearch/secret/admin-key "
+            "https://logging-es:9200/project.{namespace}*/_count?q=message:{uuid}"
+        )
+        exec_cmd = exec_cmd.format(pod_name=pod_name, namespace=self.logging_namespace, uuid=uuid)
+        result = self.exec_oc(self.execute_module, self.logging_namespace, exec_cmd, [], task_vars)
+
+        try:
+            count = json.loads(result)["count"]
+        except KeyError:
+            msg = 'invalid response from Elasticsearch query:\n"{}"\nMissing "count" key:\n{}'
+            raise OpenShiftCheckException(msg.format(exec_cmd, result))
+        except ValueError:
+            msg = 'invalid response from Elasticsearch query:\n"{}"\nNon-JSON output:\n{}'
+            raise OpenShiftCheckException(msg.format(exec_cmd, result))
+
+        return count
+
+    @staticmethod
+    def running_pods(pods):
+        """Filter pods that are running."""
+        return [pod for pod in pods if pod['status']['phase'] == 'Running']
+
+    @staticmethod
+    def generate_uuid():
+        """Wrap uuid generator. Allows for testing with expected values."""
+        return str(uuid4())

+ 1 - 1
roles/openshift_health_checker/test/logging_check_test.py

@@ -128,7 +128,7 @@ def test_is_active(groups, logging_deployed, is_active):
     (
         'No resources found.',
         None,
-        'There are no pods in the logging namespace',
+        'No pods were found for the "es"',
     ),
     (
         json.dumps({'items': [plain_kibana_pod, plain_es_pod, plain_curator_pod, fluentd_pod_node1]}),

+ 182 - 0
roles/openshift_health_checker/test/logging_index_time_test.py

@@ -0,0 +1,182 @@
+import json
+
+import pytest
+
+from openshift_checks.logging.logging_index_time import LoggingIndexTime, OpenShiftCheckException
+
+
+SAMPLE_UUID = "unique-test-uuid"
+
+
+def canned_loggingindextime(exec_oc=None):
+    """Create a check object with a canned exec_oc method"""
+    check = LoggingIndexTime("dummy")  # fails if a module is actually invoked
+    if exec_oc:
+        check.exec_oc = exec_oc
+    return check
+
+
+plain_running_elasticsearch_pod = {
+    "metadata": {
+        "labels": {"component": "es", "deploymentconfig": "logging-es-data-master"},
+        "name": "logging-es-data-master-1",
+    },
+    "status": {
+        "containerStatuses": [{"ready": True}, {"ready": True}],
+        "phase": "Running",
+    }
+}
+plain_running_kibana_pod = {
+    "metadata": {
+        "labels": {"component": "kibana", "deploymentconfig": "logging-kibana"},
+        "name": "logging-kibana-1",
+    },
+    "status": {
+        "containerStatuses": [{"ready": True}, {"ready": True}],
+        "phase": "Running",
+    }
+}
+not_running_kibana_pod = {
+    "metadata": {
+        "labels": {"component": "kibana", "deploymentconfig": "logging-kibana"},
+        "name": "logging-kibana-2",
+    },
+    "status": {
+        "containerStatuses": [{"ready": True}, {"ready": False}],
+        "conditions": [{"status": "True", "type": "Ready"}],
+        "phase": "pending",
+    }
+}
+
+
+@pytest.mark.parametrize('pods, expect_pods', [
+    (
+        [not_running_kibana_pod],
+        [],
+    ),
+    (
+        [plain_running_kibana_pod],
+        [plain_running_kibana_pod],
+    ),
+    (
+        [],
+        [],
+    )
+])
+def test_check_running_pods(pods, expect_pods):
+    check = canned_loggingindextime(None)
+    pods = check.running_pods(pods)
+    assert pods == expect_pods
+
+
+@pytest.mark.parametrize('name, json_response, uuid, timeout, extra_words', [
+    (
+        'valid count in response',
+        {
+            "count": 1,
+        },
+        SAMPLE_UUID,
+        0.001,
+        [],
+    ),
+], ids=lambda argval: argval[0])
+def test_wait_until_cmd_or_err_succeeds(name, json_response, uuid, timeout, extra_words):
+    def exec_oc(execute_module, ns, exec_cmd, args, task_vars):
+        return json.dumps(json_response)
+
+    check = canned_loggingindextime(exec_oc)
+    check.wait_until_cmd_or_err(plain_running_elasticsearch_pod, uuid, timeout, None)
+
+
+@pytest.mark.parametrize('name, json_response, uuid, timeout, extra_words', [
+    (
+        'invalid json response',
+        {
+            "invalid_field": 1,
+        },
+        SAMPLE_UUID,
+        0.001,
+        ["invalid response", "Elasticsearch"],
+    ),
+    (
+        'empty response',
+        {},
+        SAMPLE_UUID,
+        0.001,
+        ["invalid response", "Elasticsearch"],
+    ),
+    (
+        'valid response but invalid match count',
+        {
+            "count": 0,
+        },
+        SAMPLE_UUID,
+        0.005,
+        ["expecting match", SAMPLE_UUID, "0.005s"],
+    )
+], ids=lambda argval: argval[0])
+def test_wait_until_cmd_or_err(name, json_response, uuid, timeout, extra_words):
+    def exec_oc(execute_module, ns, exec_cmd, args, task_vars):
+        return json.dumps(json_response)
+
+    check = canned_loggingindextime(exec_oc)
+    with pytest.raises(OpenShiftCheckException) as error:
+        check.wait_until_cmd_or_err(plain_running_elasticsearch_pod, uuid, timeout, None)
+
+    for word in extra_words:
+        assert word in str(error)
+
+
+@pytest.mark.parametrize('name, json_response, uuid, extra_words', [
+    (
+        'correct response code, found unique id is returned',
+        {
+            "statusCode": 404,
+        },
+        "sample unique id",
+        ["sample unique id"],
+    ),
+], ids=lambda argval: argval[0])
+def test_curl_kibana_with_uuid(name, json_response, uuid, extra_words):
+    def exec_oc(execute_module, ns, exec_cmd, args, task_vars):
+        return json.dumps(json_response)
+
+    check = canned_loggingindextime(exec_oc)
+    check.generate_uuid = lambda: uuid
+
+    result = check.curl_kibana_with_uuid(plain_running_kibana_pod, None)
+
+    for word in extra_words:
+        assert word in result
+
+
+@pytest.mark.parametrize('name, json_response, uuid, extra_words', [
+    (
+        'invalid json response',
+        {
+            "invalid_field": "invalid",
+        },
+        SAMPLE_UUID,
+        ["invalid response returned", 'Missing "statusCode" key'],
+    ),
+    (
+        'wrong error code in response',
+        {
+            "statusCode": 500,
+        },
+        SAMPLE_UUID,
+        ["Expecting error code", "500"],
+    ),
+], ids=lambda argval: argval[0])
+def test_failed_curl_kibana_with_uuid(name, json_response, uuid, extra_words):
+    def exec_oc(execute_module, ns, exec_cmd, args, task_vars):
+        return json.dumps(json_response)
+
+    check = canned_loggingindextime(exec_oc)
+    check.generate_uuid = lambda: uuid
+
+    with pytest.raises(OpenShiftCheckException) as error:
+        check.curl_kibana_with_uuid(plain_running_kibana_pod, None)
+
+    for word in extra_words:
+        assert word in str(error)