Browse Source

Merge pull request #3846 from rhcarvalho/cs-unit-tests

Merged by openshift-bot
OpenShift Bot 8 năm trước cách đây
mục cha
commit
ec80b0c7d1

+ 34 - 26
roles/openshift_health_checker/action_plugins/openshift_health_check.py

@@ -4,6 +4,7 @@ Ansible action plugin to execute health checks in OpenShift clusters.
 # pylint: disable=wrong-import-position,missing-docstring,invalid-name
 import sys
 import os
+from collections import defaultdict
 
 try:
     from __main__ import display
@@ -41,20 +42,11 @@ class ActionModule(ActionBase):
             return result
 
         args = self._task.args
-        requested_checks = resolve_checks(args.get("checks", []), known_checks.values())
-
-        unknown_checks = requested_checks - set(known_checks)
-        if unknown_checks:
-            result["failed"] = True
-            result["msg"] = (
-                "One or more checks are unknown: {}. "
-                "Make sure there is no typo in the playbook and no files are missing."
-            ).format(", ".join(unknown_checks))
-            return result
+        resolved_checks = resolve_checks(args.get("checks", []), known_checks.values())
 
         result["checks"] = check_results = {}
 
-        for check_name in requested_checks & set(known_checks):
+        for check_name in resolved_checks:
             display.banner("CHECK [{} : {}]".format(check_name, task_vars["ansible_host"]))
             check = known_checks[check_name]
 
@@ -81,10 +73,7 @@ class ActionModule(ActionBase):
         load_checks()
 
         known_checks = {}
-
-        known_check_classes = set(cls for cls in OpenShiftCheck.subclasses())
-
-        for cls in known_check_classes:
+        for cls in OpenShiftCheck.subclasses():
             check_name = cls.name
             if check_name in known_checks:
                 other_cls = known_checks[check_name].__class__
@@ -94,26 +83,45 @@ class ActionModule(ActionBase):
                         cls.__module__, cls.__name__,
                         other_cls.__module__, other_cls.__name__))
             known_checks[check_name] = cls(execute_module=self._execute_module)
-
         return known_checks
 
 
 def resolve_checks(names, all_checks):
     """Returns a set of resolved check names.
 
-    Resolving a check name involves expanding tag references (e.g., '@tag') with
-    all the checks that contain the given tag.
+    Resolving a check name expands tag references (e.g., "@tag") to all the
+    checks that contain the given tag. OpenShiftCheckException is raised if
+    names contains an unknown check or tag name.
 
     names should be a sequence of strings.
 
     all_checks should be a sequence of check classes/instances.
     """
-    resolved = set()
-    for name in names:
-        if name.startswith("@"):
-            for check in all_checks:
-                if name[1:] in check.tags:
-                    resolved.add(check.name)
-        else:
-            resolved.add(name)
+    known_check_names = set(check.name for check in all_checks)
+    known_tag_names = set(name for check in all_checks for name in check.tags)
+
+    check_names = set(name for name in names if not name.startswith('@'))
+    tag_names = set(name[1:] for name in names if name.startswith('@'))
+
+    unknown_check_names = check_names - known_check_names
+    unknown_tag_names = tag_names - known_tag_names
+
+    if unknown_check_names or unknown_tag_names:
+        msg = []
+        if unknown_check_names:
+            msg.append('Unknown check names: {}.'.format(', '.join(sorted(unknown_check_names))))
+        if unknown_tag_names:
+            msg.append('Unknown tag names: {}.'.format(', '.join(sorted(unknown_tag_names))))
+        msg.append('Make sure there is no typo in the playbook and no files are missing.')
+        raise OpenShiftCheckException('\n'.join(msg))
+
+    tag_to_checks = defaultdict(set)
+    for check in all_checks:
+        for tag in check.tags:
+            tag_to_checks[tag].add(check.name)
+
+    resolved = check_names.copy()
+    for tag in tag_names:
+        resolved.update(tag_to_checks[tag])
+
     return resolved

+ 227 - 0
roles/openshift_health_checker/test/action_plugin_test.py

@@ -0,0 +1,227 @@
+import pytest
+
+from openshift_health_check import ActionModule, resolve_checks
+from openshift_checks import OpenShiftCheckException
+
+
+def fake_check(name='fake_check', tags=None, is_active=True, run_return=None, run_exception=None):
+    """Returns a new class that is compatible with OpenShiftCheck for testing."""
+
+    _name, _tags = name, tags
+
+    class FakeCheck(object):
+        name = _name
+        tags = _tags or []
+
+        def __init__(self, execute_module=None):
+            pass
+
+        @classmethod
+        def is_active(cls, task_vars):
+            return is_active
+
+        def run(self, tmp, task_vars):
+            if run_exception is not None:
+                raise run_exception
+            return run_return
+
+    return FakeCheck
+
+
+# Fixtures
+
+
+@pytest.fixture
+def plugin():
+    task = FakeTask('openshift_health_check', {'checks': ['fake_check']})
+    plugin = ActionModule(task, None, None, None, None, None)
+    return plugin
+
+
+class FakeTask(object):
+    def __init__(self, action, args):
+        self.action = action
+        self.args = args
+        self.async = 0
+
+
+@pytest.fixture
+def task_vars():
+    return dict(openshift=dict(), ansible_host='unit-test-host')
+
+
+# Assertion helpers
+
+
+def failed(result, msg_has=None):
+    if msg_has is not None:
+        assert 'msg' in result
+        for term in msg_has:
+            assert term in result['msg']
+    return result.get('failed', False)
+
+
+def changed(result):
+    return result.get('changed', False)
+
+
+def skipped(result):
+    return result.get('skipped', False)
+
+
+# Tests
+
+
+@pytest.mark.parametrize('task_vars', [
+    None,
+    {},
+])
+def test_action_plugin_missing_openshift_facts(plugin, task_vars):
+    result = plugin.run(tmp=None, task_vars=task_vars)
+
+    assert failed(result, msg_has=['openshift_facts'])
+
+
+def test_action_plugin_cannot_load_checks_with_the_same_name(plugin, task_vars, monkeypatch):
+    FakeCheck1 = fake_check('duplicate_name')
+    FakeCheck2 = fake_check('duplicate_name')
+    checks = [FakeCheck1, FakeCheck2]
+    monkeypatch.setattr('openshift_checks.OpenShiftCheck.subclasses', classmethod(lambda cls: checks))
+
+    result = plugin.run(tmp=None, task_vars=task_vars)
+
+    assert failed(result, msg_has=['unique', 'duplicate_name', 'FakeCheck'])
+
+
+def test_action_plugin_skip_non_active_checks(plugin, task_vars, monkeypatch):
+    checks = [fake_check(is_active=False)]
+    monkeypatch.setattr('openshift_checks.OpenShiftCheck.subclasses', classmethod(lambda cls: checks))
+
+    result = plugin.run(tmp=None, task_vars=task_vars)
+
+    assert result['checks']['fake_check'] == {'skipped': True}
+    assert not failed(result)
+    assert not changed(result)
+    assert not skipped(result)
+
+
+def test_action_plugin_run_check_ok(plugin, task_vars, monkeypatch):
+    check_return_value = {'ok': 'test'}
+    check_class = fake_check(run_return=check_return_value)
+    monkeypatch.setattr(plugin, 'load_known_checks', lambda: {'fake_check': check_class()})
+    monkeypatch.setattr('openshift_health_check.resolve_checks', lambda *args: ['fake_check'])
+
+    result = plugin.run(tmp=None, task_vars=task_vars)
+
+    assert result['checks']['fake_check'] == check_return_value
+    assert not failed(result)
+    assert not changed(result)
+    assert not skipped(result)
+
+
+def test_action_plugin_run_check_changed(plugin, task_vars, monkeypatch):
+    check_return_value = {'ok': 'test', 'changed': True}
+    check_class = fake_check(run_return=check_return_value)
+    monkeypatch.setattr(plugin, 'load_known_checks', lambda: {'fake_check': check_class()})
+    monkeypatch.setattr('openshift_health_check.resolve_checks', lambda *args: ['fake_check'])
+
+    result = plugin.run(tmp=None, task_vars=task_vars)
+
+    assert result['checks']['fake_check'] == check_return_value
+    assert not failed(result)
+    assert changed(result)
+    assert not skipped(result)
+
+
+def test_action_plugin_run_check_fail(plugin, task_vars, monkeypatch):
+    check_return_value = {'failed': True}
+    check_class = fake_check(run_return=check_return_value)
+    monkeypatch.setattr(plugin, 'load_known_checks', lambda: {'fake_check': check_class()})
+    monkeypatch.setattr('openshift_health_check.resolve_checks', lambda *args: ['fake_check'])
+
+    result = plugin.run(tmp=None, task_vars=task_vars)
+
+    assert result['checks']['fake_check'] == check_return_value
+    assert failed(result, msg_has=['failed'])
+    assert not changed(result)
+    assert not skipped(result)
+
+
+def test_action_plugin_run_check_exception(plugin, task_vars, monkeypatch):
+    exception_msg = 'fake check has an exception'
+    run_exception = OpenShiftCheckException(exception_msg)
+    check_class = fake_check(run_exception=run_exception)
+    monkeypatch.setattr(plugin, 'load_known_checks', lambda: {'fake_check': check_class()})
+    monkeypatch.setattr('openshift_health_check.resolve_checks', lambda *args: ['fake_check'])
+
+    result = plugin.run(tmp=None, task_vars=task_vars)
+
+    assert failed(result['checks']['fake_check'], msg_has=exception_msg)
+    assert failed(result, msg_has=['failed'])
+    assert not changed(result)
+    assert not skipped(result)
+
+
+@pytest.mark.parametrize('names,all_checks,expected', [
+    ([], [], set()),
+    (
+        ['a', 'b'],
+        [
+            fake_check('a'),
+            fake_check('b'),
+        ],
+        set(['a', 'b']),
+    ),
+    (
+        ['a', 'b', '@group'],
+        [
+            fake_check('from_group_1', ['group', 'another_group']),
+            fake_check('not_in_group', ['another_group']),
+            fake_check('from_group_2', ['preflight', 'group']),
+            fake_check('a'),
+            fake_check('b'),
+        ],
+        set(['a', 'b', 'from_group_1', 'from_group_2']),
+    ),
+])
+def test_resolve_checks_ok(names, all_checks, expected):
+    assert resolve_checks(names, all_checks) == expected
+
+
+@pytest.mark.parametrize('names,all_checks,words_in_exception,words_not_in_exception', [
+    (
+        ['testA', 'testB'],
+        [],
+        ['check', 'name', 'testA', 'testB'],
+        ['tag', 'group', '@'],
+    ),
+    (
+        ['@group'],
+        [],
+        ['tag', 'name', 'group'],
+        ['check', '@'],
+    ),
+    (
+        ['testA', 'testB', '@group'],
+        [],
+        ['check', 'name', 'testA', 'testB', 'tag', 'group'],
+        ['@'],
+    ),
+    (
+        ['testA', 'testB', '@group'],
+        [
+            fake_check('from_group_1', ['group', 'another_group']),
+            fake_check('not_in_group', ['another_group']),
+            fake_check('from_group_2', ['preflight', 'group']),
+        ],
+        ['check', 'name', 'testA', 'testB'],
+        ['tag', 'group', '@'],
+    ),
+])
+def test_resolve_checks_failure(names, all_checks, words_in_exception, words_not_in_exception):
+    with pytest.raises(Exception) as excinfo:
+        resolve_checks(names, all_checks)
+    for word in words_in_exception:
+        assert word in str(excinfo.value)
+    for word in words_not_in_exception:
+        assert word not in str(excinfo.value)

+ 7 - 2
roles/openshift_health_checker/test/conftest.py

@@ -1,5 +1,10 @@
 import os
 import sys
 
-# extend sys.path so that tests can import openshift_checks
-sys.path.insert(1, os.path.dirname(os.path.dirname(__file__)))
+# extend sys.path so that tests can import openshift_checks and action plugins
+# from this role.
+openshift_health_checker_path = os.path.dirname(os.path.dirname(__file__))
+sys.path[1:1] = [
+    openshift_health_checker_path,
+    os.path.join(openshift_health_checker_path, 'action_plugins')
+]