action_plugin_test.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. import pytest
  2. from ansible.playbook.play_context import PlayContext
  3. from openshift_health_check import ActionModule, resolve_checks
  4. from openshift_health_check import copy_remote_file_to_dir, write_result_to_output_dir, write_to_output_file
  5. from openshift_checks import OpenShiftCheckException, FileToSave
  6. def fake_check(name='fake_check', tags=None, is_active=True, run_return=None, run_exception=None,
  7. run_logs=None, run_files=None, changed=False, get_var_return=None):
  8. """Returns a new class that is compatible with OpenShiftCheck for testing."""
  9. _name, _tags = name, tags
  10. class FakeCheck(object):
  11. name = _name
  12. tags = _tags or []
  13. def __init__(self, **_):
  14. self.changed = False
  15. self.failures = []
  16. self.logs = run_logs or []
  17. self.files_to_save = run_files or []
  18. def is_active(self):
  19. if isinstance(is_active, Exception):
  20. raise is_active
  21. return is_active
  22. def run(self):
  23. self.changed = changed
  24. if run_exception is not None:
  25. raise run_exception
  26. return run_return
  27. def get_var(*args, **_):
  28. return get_var_return
  29. def register_failure(self, exc):
  30. self.failures.append(OpenShiftCheckException(str(exc)))
  31. return
  32. return FakeCheck
  33. # Fixtures
  34. @pytest.fixture
  35. def plugin():
  36. task = FakeTask('openshift_health_check', {'checks': ['fake_check']})
  37. plugin = ActionModule(task, None, PlayContext(), None, None, None)
  38. return plugin
  39. class FakeTask(object):
  40. def __init__(self, action, args):
  41. self.action = action
  42. self.args = args
  43. self.async = 0
  44. @pytest.fixture
  45. def task_vars():
  46. return dict(openshift=dict(), ansible_host='unit-test-host')
  47. # Assertion helpers
  48. def failed(result, msg_has=None):
  49. if msg_has is not None:
  50. assert 'msg' in result
  51. for term in msg_has:
  52. assert term.lower() in result['msg'].lower()
  53. return result.get('failed', False)
  54. def changed(result):
  55. return result.get('changed', False)
  56. # tests whether task is skipped, not individual checks
  57. def skipped(result):
  58. return result.get('skipped', False)
  59. # Tests
  60. @pytest.mark.parametrize('task_vars', [
  61. None,
  62. {},
  63. ])
  64. def test_action_plugin_missing_openshift_facts(plugin, task_vars, monkeypatch):
  65. monkeypatch.setattr(plugin, 'load_known_checks', lambda *_: {})
  66. monkeypatch.setattr('openshift_health_check.resolve_checks', lambda *args: ['fake_check'])
  67. result = plugin.run(tmp=None, task_vars=task_vars)
  68. assert failed(result, msg_has=['openshift_facts'])
  69. def test_action_plugin_cannot_load_checks_with_the_same_name(plugin, task_vars, monkeypatch):
  70. FakeCheck1 = fake_check('duplicate_name')
  71. FakeCheck2 = fake_check('duplicate_name')
  72. checks = [FakeCheck1, FakeCheck2]
  73. monkeypatch.setattr('openshift_checks.OpenShiftCheck.subclasses', classmethod(lambda cls: checks))
  74. result = plugin.run(tmp=None, task_vars=task_vars)
  75. assert failed(result, msg_has=['duplicate', 'duplicate_name', 'FakeCheck'])
  76. @pytest.mark.parametrize('is_active, skipped_reason', [
  77. (False, "Not active for this host"),
  78. (Exception("borked"), "exception"),
  79. ])
  80. def test_action_plugin_skip_non_active_checks(is_active, skipped_reason, plugin, task_vars, monkeypatch):
  81. checks = [fake_check(is_active=is_active)]
  82. monkeypatch.setattr('openshift_checks.OpenShiftCheck.subclasses', classmethod(lambda cls: checks))
  83. result = plugin.run(tmp=None, task_vars=task_vars)
  84. assert result['checks']['fake_check'].get('skipped')
  85. assert skipped_reason in result['checks']['fake_check'].get('skipped_reason')
  86. assert not failed(result)
  87. assert not changed(result)
  88. assert not skipped(result)
  89. @pytest.mark.parametrize('to_disable', [
  90. 'fake_check',
  91. ['fake_check', 'spam'],
  92. '*,spam,eggs',
  93. ])
  94. def test_action_plugin_skip_disabled_checks(to_disable, plugin, task_vars, monkeypatch):
  95. checks = [fake_check('fake_check', is_active=True)]
  96. monkeypatch.setattr('openshift_checks.OpenShiftCheck.subclasses', classmethod(lambda cls: checks))
  97. task_vars['openshift_disable_check'] = to_disable
  98. result = plugin.run(tmp=None, task_vars=task_vars)
  99. assert result['checks']['fake_check'] == dict(skipped=True, skipped_reason="Disabled by user request")
  100. assert not failed(result)
  101. assert not changed(result)
  102. assert not skipped(result)
  103. def test_action_plugin_run_list_checks(monkeypatch):
  104. task = FakeTask('openshift_health_check', {'checks': []})
  105. plugin = ActionModule(task, None, PlayContext(), None, None, None)
  106. monkeypatch.setattr(plugin, 'load_known_checks', lambda *_: {})
  107. result = plugin.run()
  108. assert failed(result, msg_has="Available checks")
  109. assert not changed(result)
  110. assert not skipped(result)
  111. def test_action_plugin_run_check_ok(plugin, task_vars, monkeypatch):
  112. check_return_value = {'ok': 'test'}
  113. check_class = fake_check(run_return=check_return_value, run_files=[None])
  114. monkeypatch.setattr(plugin, 'load_known_checks', lambda *_: {'fake_check': check_class()})
  115. monkeypatch.setattr('openshift_health_check.resolve_checks', lambda *args: ['fake_check'])
  116. result = plugin.run(tmp=None, task_vars=task_vars)
  117. assert result['checks']['fake_check'] == check_return_value
  118. assert not failed(result)
  119. assert not changed(result)
  120. assert not skipped(result)
  121. def test_action_plugin_run_check_changed(plugin, task_vars, monkeypatch):
  122. check_return_value = {'ok': 'test'}
  123. check_class = fake_check(run_return=check_return_value, changed=True)
  124. monkeypatch.setattr(plugin, 'load_known_checks', lambda *_: {'fake_check': check_class()})
  125. monkeypatch.setattr('openshift_health_check.resolve_checks', lambda *args: ['fake_check'])
  126. result = plugin.run(tmp=None, task_vars=task_vars)
  127. assert result['checks']['fake_check'] == check_return_value
  128. assert changed(result['checks']['fake_check'])
  129. assert not failed(result)
  130. assert changed(result)
  131. assert not skipped(result)
  132. def test_action_plugin_run_check_fail(plugin, task_vars, monkeypatch):
  133. check_return_value = {'failed': True, 'msg': 'this is a failure'}
  134. check_class = fake_check(run_return=check_return_value)
  135. monkeypatch.setattr(plugin, 'load_known_checks', lambda *_: {'fake_check': check_class()})
  136. monkeypatch.setattr('openshift_health_check.resolve_checks', lambda *args: ['fake_check'])
  137. result = plugin.run(tmp=None, task_vars=task_vars)
  138. assert result['checks']['fake_check'] == check_return_value
  139. assert failed(result, msg_has=['failed'])
  140. assert not changed(result)
  141. assert not skipped(result)
  142. @pytest.mark.parametrize('exc_class, expect_traceback', [
  143. (OpenShiftCheckException, False),
  144. (Exception, True),
  145. ])
  146. def test_action_plugin_run_check_exception(plugin, task_vars, exc_class, expect_traceback, monkeypatch):
  147. exception_msg = 'fake check has an exception'
  148. run_exception = exc_class(exception_msg)
  149. check_class = fake_check(run_exception=run_exception, changed=True)
  150. monkeypatch.setattr(plugin, 'load_known_checks', lambda *_: {'fake_check': check_class()})
  151. monkeypatch.setattr('openshift_health_check.resolve_checks', lambda *args: ['fake_check'])
  152. result = plugin.run(tmp=None, task_vars=task_vars)
  153. assert failed(result['checks']['fake_check'], msg_has=exception_msg)
  154. assert expect_traceback == ("Traceback" in result['checks']['fake_check']['msg'])
  155. assert failed(result, msg_has=['failed'])
  156. assert changed(result['checks']['fake_check'])
  157. assert changed(result)
  158. assert not skipped(result)
  159. def test_action_plugin_run_check_output_dir(plugin, task_vars, tmpdir, monkeypatch):
  160. check_class = fake_check(
  161. run_return={},
  162. run_logs=[('thing', 'note')],
  163. run_files=[
  164. FileToSave('save.file', 'contents', None),
  165. FileToSave('save.file', 'duplicate', None),
  166. FileToSave('copy.file', None, 'foo'), # note: copy runs execute_module => exception
  167. ],
  168. )
  169. task_vars['openshift_checks_output_dir'] = str(tmpdir)
  170. check_class.get_var = lambda self, name, **_: task_vars.get(name)
  171. monkeypatch.setattr(plugin, 'load_known_checks', lambda *_: {'fake_check': check_class()})
  172. monkeypatch.setattr('openshift_health_check.resolve_checks', lambda *args: ['fake_check'])
  173. plugin.run(tmp=None, task_vars=task_vars)
  174. assert any(path.basename == task_vars['ansible_host'] for path in tmpdir.listdir())
  175. assert any(path.basename == 'fake_check.log.json' for path in tmpdir.visit())
  176. assert any(path.basename == 'save.file' for path in tmpdir.visit())
  177. assert any(path.basename == 'save.file.2' for path in tmpdir.visit())
  178. def test_action_plugin_resolve_checks_exception(plugin, task_vars, monkeypatch):
  179. monkeypatch.setattr(plugin, 'load_known_checks', lambda *_: {})
  180. result = plugin.run(tmp=None, task_vars=task_vars)
  181. assert failed(result, msg_has=['unknown', 'name'])
  182. assert not changed(result)
  183. assert not skipped(result)
  184. @pytest.mark.parametrize('names,all_checks,expected', [
  185. ([], [], set()),
  186. (
  187. ['a', 'b'],
  188. [
  189. fake_check('a'),
  190. fake_check('b'),
  191. ],
  192. set(['a', 'b']),
  193. ),
  194. (
  195. ['a', 'b', '@group'],
  196. [
  197. fake_check('from_group_1', ['group', 'another_group']),
  198. fake_check('not_in_group', ['another_group']),
  199. fake_check('from_group_2', ['preflight', 'group']),
  200. fake_check('a'),
  201. fake_check('b'),
  202. ],
  203. set(['a', 'b', 'from_group_1', 'from_group_2']),
  204. ),
  205. ])
  206. def test_resolve_checks_ok(names, all_checks, expected):
  207. assert resolve_checks(names, all_checks) == expected
  208. @pytest.mark.parametrize('names,all_checks,words_in_exception', [
  209. (
  210. ['testA', 'testB'],
  211. [],
  212. ['check', 'name', 'testA', 'testB'],
  213. ),
  214. (
  215. ['@group'],
  216. [],
  217. ['tag', 'name', 'group'],
  218. ),
  219. (
  220. ['testA', 'testB', '@group'],
  221. [],
  222. ['check', 'name', 'testA', 'testB', 'tag', 'group'],
  223. ),
  224. (
  225. ['testA', 'testB', '@group'],
  226. [
  227. fake_check('from_group_1', ['group', 'another_group']),
  228. fake_check('not_in_group', ['another_group']),
  229. fake_check('from_group_2', ['preflight', 'group']),
  230. ],
  231. ['check', 'name', 'testA', 'testB'],
  232. ),
  233. ])
  234. def test_resolve_checks_failure(names, all_checks, words_in_exception):
  235. with pytest.raises(Exception) as excinfo:
  236. resolve_checks(names, all_checks)
  237. for word in words_in_exception:
  238. assert word in str(excinfo.value)
  239. @pytest.mark.parametrize('give_output_dir, result, expect_file', [
  240. (False, None, False),
  241. (True, dict(content="c3BhbQo=", encoding="base64"), True),
  242. (True, dict(content="encoding error", encoding="base64"), False),
  243. (True, dict(content="spam", no_encoding=None), True),
  244. (True, dict(failed=True, msg="could not slurp"), False),
  245. ])
  246. def test_copy_remote_file_to_dir(give_output_dir, result, expect_file, tmpdir):
  247. check = fake_check()()
  248. check.execute_module = lambda *args, **_: result
  249. copy_remote_file_to_dir(check, "remote_file", str(tmpdir) if give_output_dir else "", "local_file")
  250. assert expect_file == any(path.basename == "local_file" for path in tmpdir.listdir())
  251. def test_write_to_output_exceptions(tmpdir, monkeypatch, capsys):
  252. class Spam(object):
  253. def __str__(self):
  254. raise Exception("break str")
  255. test = {1: object(), 2: Spam()}
  256. test[3] = test
  257. write_result_to_output_dir(str(tmpdir), test)
  258. assert "Error writing" in test["output_files"]
  259. output_dir = tmpdir.join("eggs")
  260. output_dir.write("spam") # so now it's not a dir
  261. write_to_output_file(str(output_dir), "somefile", "somedata")
  262. assert "Could not write" in capsys.readouterr()[1]
  263. monkeypatch.setattr("openshift_health_check.prepare_output_dir", lambda *_: False)
  264. write_result_to_output_dir(str(tmpdir), test)
  265. assert "Error creating" in test["output_files"]