action_plugin_test.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. import pytest
  2. from ansible.playbook.play_context import PlayContext
  3. from openshift_health_check import ActionModule, resolve_checks
  4. from openshift_health_check import copy_remote_file_to_dir, write_result_to_output_dir, write_to_output_file
  5. from openshift_checks import OpenShiftCheckException, FileToSave
  6. def fake_check(name='fake_check', tags=None, is_active=True, run_return=None, run_exception=None,
  7. run_logs=None, run_files=None, changed=False, get_var_return=None):
  8. """Returns a new class that is compatible with OpenShiftCheck for testing."""
  9. _name, _tags = name, tags
  10. class FakeCheck(object):
  11. name = _name
  12. tags = _tags or []
  13. def __init__(self, **_):
  14. self.changed = False
  15. self.failures = []
  16. self.logs = run_logs or []
  17. self.files_to_save = run_files or []
  18. def is_active(self):
  19. if isinstance(is_active, Exception):
  20. raise is_active
  21. return is_active
  22. def run(self):
  23. self.changed = changed
  24. if run_exception is not None:
  25. raise run_exception
  26. return run_return
  27. def get_var(*args, **_):
  28. return get_var_return
  29. def register_failure(self, exc):
  30. self.failures.append(OpenShiftCheckException(str(exc)))
  31. return
  32. return FakeCheck
  33. # Fixtures
  34. @pytest.fixture
  35. def plugin():
  36. task = FakeTask('openshift_health_check', {'checks': ['fake_check']})
  37. plugin = ActionModule(task, None, PlayContext(), None, None, None)
  38. return plugin
  39. class FakeTask(object):
  40. def __init__(self, action, args):
  41. self.action = action
  42. self.args = args
  43. self.async = 0
  44. @pytest.fixture
  45. def task_vars():
  46. return dict(openshift=dict(), ansible_host='unit-test-host')
  47. # Assertion helpers
  48. def failed(result, msg_has=None):
  49. if msg_has is not None:
  50. assert 'msg' in result
  51. for term in msg_has:
  52. assert term.lower() in result['msg'].lower()
  53. return result.get('failed', False)
  54. def changed(result):
  55. return result.get('changed', False)
  56. # tests whether task is skipped, not individual checks
  57. def skipped(result):
  58. return result.get('skipped', False)
  59. # Tests
  60. @pytest.mark.parametrize('task_vars', [
  61. None,
  62. {},
  63. ])
  64. def test_action_plugin_missing_openshift_facts(plugin, task_vars, monkeypatch):
  65. monkeypatch.setattr('openshift_health_check.resolve_checks', lambda *args: ['fake_check'])
  66. result = plugin.run(tmp=None, task_vars=task_vars)
  67. assert failed(result, msg_has=['openshift_facts'])
  68. def test_action_plugin_cannot_load_checks_with_the_same_name(plugin, task_vars, monkeypatch):
  69. FakeCheck1 = fake_check('duplicate_name')
  70. FakeCheck2 = fake_check('duplicate_name')
  71. checks = [FakeCheck1, FakeCheck2]
  72. monkeypatch.setattr('openshift_checks.OpenShiftCheck.subclasses', classmethod(lambda cls: checks))
  73. result = plugin.run(tmp=None, task_vars=task_vars)
  74. assert failed(result, msg_has=['duplicate', 'duplicate_name', 'FakeCheck'])
  75. @pytest.mark.parametrize('is_active, skipped_reason', [
  76. (False, "Not active for this host"),
  77. (Exception("borked"), "exception"),
  78. ])
  79. def test_action_plugin_skip_non_active_checks(is_active, skipped_reason, plugin, task_vars, monkeypatch):
  80. checks = [fake_check(is_active=is_active)]
  81. monkeypatch.setattr('openshift_checks.OpenShiftCheck.subclasses', classmethod(lambda cls: checks))
  82. result = plugin.run(tmp=None, task_vars=task_vars)
  83. assert result['checks']['fake_check'].get('skipped')
  84. assert skipped_reason in result['checks']['fake_check'].get('skipped_reason')
  85. assert not failed(result)
  86. assert not changed(result)
  87. assert not skipped(result)
  88. @pytest.mark.parametrize('to_disable', [
  89. 'fake_check',
  90. ['fake_check', 'spam'],
  91. '*,spam,eggs',
  92. ])
  93. def test_action_plugin_skip_disabled_checks(to_disable, plugin, task_vars, monkeypatch):
  94. checks = [fake_check('fake_check', is_active=True)]
  95. monkeypatch.setattr('openshift_checks.OpenShiftCheck.subclasses', classmethod(lambda cls: checks))
  96. task_vars['openshift_disable_check'] = to_disable
  97. result = plugin.run(tmp=None, task_vars=task_vars)
  98. assert result['checks']['fake_check'] == dict(skipped=True, skipped_reason="Disabled by user request")
  99. assert not failed(result)
  100. assert not changed(result)
  101. assert not skipped(result)
  102. def test_action_plugin_run_list_checks(monkeypatch):
  103. task = FakeTask('openshift_health_check', {'checks': []})
  104. plugin = ActionModule(task, None, PlayContext(), None, None, None)
  105. monkeypatch.setattr(plugin, 'load_known_checks', lambda *_: {})
  106. result = plugin.run()
  107. assert failed(result, msg_has="Available checks")
  108. assert not changed(result)
  109. assert not skipped(result)
  110. def test_action_plugin_run_check_ok(plugin, task_vars, monkeypatch):
  111. check_return_value = {'ok': 'test'}
  112. check_class = fake_check(run_return=check_return_value, run_files=[None])
  113. monkeypatch.setattr(plugin, 'load_known_checks', lambda *_: {'fake_check': check_class()})
  114. monkeypatch.setattr('openshift_health_check.resolve_checks', lambda *args: ['fake_check'])
  115. result = plugin.run(tmp=None, task_vars=task_vars)
  116. assert result['checks']['fake_check'] == check_return_value
  117. assert not failed(result)
  118. assert not changed(result)
  119. assert not skipped(result)
  120. def test_action_plugin_run_check_changed(plugin, task_vars, monkeypatch):
  121. check_return_value = {'ok': 'test'}
  122. check_class = fake_check(run_return=check_return_value, changed=True)
  123. monkeypatch.setattr(plugin, 'load_known_checks', lambda *_: {'fake_check': check_class()})
  124. monkeypatch.setattr('openshift_health_check.resolve_checks', lambda *args: ['fake_check'])
  125. result = plugin.run(tmp=None, task_vars=task_vars)
  126. assert result['checks']['fake_check'] == check_return_value
  127. assert changed(result['checks']['fake_check'])
  128. assert not failed(result)
  129. assert changed(result)
  130. assert not skipped(result)
  131. def test_action_plugin_run_check_fail(plugin, task_vars, monkeypatch):
  132. check_return_value = {'failed': True, 'msg': 'this is a failure'}
  133. check_class = fake_check(run_return=check_return_value)
  134. monkeypatch.setattr(plugin, 'load_known_checks', lambda *_: {'fake_check': check_class()})
  135. monkeypatch.setattr('openshift_health_check.resolve_checks', lambda *args: ['fake_check'])
  136. result = plugin.run(tmp=None, task_vars=task_vars)
  137. assert result['checks']['fake_check'] == check_return_value
  138. assert failed(result, msg_has=['failed'])
  139. assert not changed(result)
  140. assert not skipped(result)
  141. @pytest.mark.parametrize('exc_class, expect_traceback', [
  142. (OpenShiftCheckException, False),
  143. (Exception, True),
  144. ])
  145. def test_action_plugin_run_check_exception(plugin, task_vars, exc_class, expect_traceback, monkeypatch):
  146. exception_msg = 'fake check has an exception'
  147. run_exception = exc_class(exception_msg)
  148. check_class = fake_check(run_exception=run_exception, changed=True)
  149. monkeypatch.setattr(plugin, 'load_known_checks', lambda *_: {'fake_check': check_class()})
  150. monkeypatch.setattr('openshift_health_check.resolve_checks', lambda *args: ['fake_check'])
  151. result = plugin.run(tmp=None, task_vars=task_vars)
  152. assert failed(result['checks']['fake_check'], msg_has=exception_msg)
  153. assert expect_traceback == ("Traceback" in result['checks']['fake_check']['msg'])
  154. assert failed(result, msg_has=['failed'])
  155. assert changed(result['checks']['fake_check'])
  156. assert changed(result)
  157. assert not skipped(result)
  158. def test_action_plugin_run_check_output_dir(plugin, task_vars, tmpdir, monkeypatch):
  159. check_class = fake_check(
  160. run_return={},
  161. run_logs=[('thing', 'note')],
  162. run_files=[
  163. FileToSave('save.file', 'contents', None),
  164. FileToSave('save.file', 'duplicate', None),
  165. FileToSave('copy.file', None, 'foo'), # note: copy runs execute_module => exception
  166. ],
  167. )
  168. task_vars['openshift_checks_output_dir'] = str(tmpdir)
  169. check_class.get_var = lambda self, name, **_: task_vars.get(name)
  170. monkeypatch.setattr(plugin, 'load_known_checks', lambda *_: {'fake_check': check_class()})
  171. monkeypatch.setattr('openshift_health_check.resolve_checks', lambda *args: ['fake_check'])
  172. plugin.run(tmp=None, task_vars=task_vars)
  173. assert any(path.basename == task_vars['ansible_host'] for path in tmpdir.listdir())
  174. assert any(path.basename == 'fake_check.log.json' for path in tmpdir.visit())
  175. assert any(path.basename == 'save.file' for path in tmpdir.visit())
  176. assert any(path.basename == 'save.file.2' for path in tmpdir.visit())
  177. def test_action_plugin_resolve_checks_exception(plugin, task_vars, monkeypatch):
  178. monkeypatch.setattr(plugin, 'load_known_checks', lambda *_: {})
  179. result = plugin.run(tmp=None, task_vars=task_vars)
  180. assert failed(result, msg_has=['unknown', 'name'])
  181. assert not changed(result)
  182. assert not skipped(result)
  183. @pytest.mark.parametrize('names,all_checks,expected', [
  184. ([], [], set()),
  185. (
  186. ['a', 'b'],
  187. [
  188. fake_check('a'),
  189. fake_check('b'),
  190. ],
  191. set(['a', 'b']),
  192. ),
  193. (
  194. ['a', 'b', '@group'],
  195. [
  196. fake_check('from_group_1', ['group', 'another_group']),
  197. fake_check('not_in_group', ['another_group']),
  198. fake_check('from_group_2', ['preflight', 'group']),
  199. fake_check('a'),
  200. fake_check('b'),
  201. ],
  202. set(['a', 'b', 'from_group_1', 'from_group_2']),
  203. ),
  204. ])
  205. def test_resolve_checks_ok(names, all_checks, expected):
  206. assert resolve_checks(names, all_checks) == expected
  207. @pytest.mark.parametrize('names,all_checks,words_in_exception', [
  208. (
  209. ['testA', 'testB'],
  210. [],
  211. ['check', 'name', 'testA', 'testB'],
  212. ),
  213. (
  214. ['@group'],
  215. [],
  216. ['tag', 'name', 'group'],
  217. ),
  218. (
  219. ['testA', 'testB', '@group'],
  220. [],
  221. ['check', 'name', 'testA', 'testB', 'tag', 'group'],
  222. ),
  223. (
  224. ['testA', 'testB', '@group'],
  225. [
  226. fake_check('from_group_1', ['group', 'another_group']),
  227. fake_check('not_in_group', ['another_group']),
  228. fake_check('from_group_2', ['preflight', 'group']),
  229. ],
  230. ['check', 'name', 'testA', 'testB'],
  231. ),
  232. ])
  233. def test_resolve_checks_failure(names, all_checks, words_in_exception):
  234. with pytest.raises(Exception) as excinfo:
  235. resolve_checks(names, all_checks)
  236. for word in words_in_exception:
  237. assert word in str(excinfo.value)
  238. @pytest.mark.parametrize('give_output_dir, result, expect_file', [
  239. (False, None, False),
  240. (True, dict(content="c3BhbQo=", encoding="base64"), True),
  241. (True, dict(content="encoding error", encoding="base64"), False),
  242. (True, dict(content="spam", no_encoding=None), True),
  243. (True, dict(failed=True, msg="could not slurp"), False),
  244. ])
  245. def test_copy_remote_file_to_dir(give_output_dir, result, expect_file, tmpdir):
  246. check = fake_check()()
  247. check.execute_module = lambda *args, **_: result
  248. copy_remote_file_to_dir(check, "remote_file", str(tmpdir) if give_output_dir else "", "local_file")
  249. assert expect_file == any(path.basename == "local_file" for path in tmpdir.listdir())
  250. def test_write_to_output_exceptions(tmpdir, monkeypatch, capsys):
  251. class Spam(object):
  252. def __str__(self):
  253. raise Exception("break str")
  254. test = {1: object(), 2: Spam()}
  255. test[3] = test
  256. write_result_to_output_dir(str(tmpdir), test)
  257. assert "Error writing" in test["output_files"]
  258. output_dir = tmpdir.join("eggs")
  259. output_dir.write("spam") # so now it's not a dir
  260. write_to_output_file(str(output_dir), "somefile", "somedata")
  261. assert "Could not write" in capsys.readouterr()[1]
  262. monkeypatch.setattr("openshift_health_check.prepare_output_dir", lambda *_: False)
  263. write_result_to_output_dir(str(tmpdir), test)
  264. assert "Error creating" in test["output_files"]