logging_check_test.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. import pytest
  2. import json
  3. from openshift_checks.logging.logging import LoggingCheck, MissingComponentPods, CouldNotUseOc
  4. task_vars_config_base = dict(openshift=dict(common=dict(config_base='/etc/origin')))
  5. def canned_loggingcheck(exec_oc=None, execute_module=None):
  6. """Create a LoggingCheck object with canned exec_oc method"""
  7. check = LoggingCheck(execute_module)
  8. if exec_oc:
  9. check.exec_oc = exec_oc
  10. return check
  11. def assert_error(error, expect_error):
  12. if expect_error:
  13. assert error
  14. assert expect_error in error
  15. else:
  16. assert not error
  17. plain_es_pod = {
  18. "metadata": {
  19. "labels": {"component": "es", "deploymentconfig": "logging-es"},
  20. "name": "logging-es",
  21. },
  22. "status": {
  23. "conditions": [{"status": "True", "type": "Ready"}],
  24. "containerStatuses": [{"ready": True}],
  25. "podIP": "10.10.10.10",
  26. },
  27. "_test_master_name_str": "name logging-es",
  28. }
  29. plain_kibana_pod = {
  30. "metadata": {
  31. "labels": {"component": "kibana", "deploymentconfig": "logging-kibana"},
  32. "name": "logging-kibana-1",
  33. },
  34. "status": {
  35. "containerStatuses": [{"ready": True}, {"ready": True}],
  36. "conditions": [{"status": "True", "type": "Ready"}],
  37. }
  38. }
  39. plain_kibana_pod_no_containerstatus = {
  40. "metadata": {
  41. "labels": {"component": "kibana", "deploymentconfig": "logging-kibana"},
  42. "name": "logging-kibana-1",
  43. },
  44. "status": {
  45. "conditions": [{"status": "True", "type": "Ready"}],
  46. }
  47. }
  48. fluentd_pod_node1 = {
  49. "metadata": {
  50. "labels": {"component": "fluentd", "deploymentconfig": "logging-fluentd"},
  51. "name": "logging-fluentd-1",
  52. },
  53. "spec": {"host": "node1", "nodeName": "node1"},
  54. "status": {
  55. "containerStatuses": [{"ready": True}],
  56. "conditions": [{"status": "True", "type": "Ready"}],
  57. }
  58. }
  59. plain_curator_pod = {
  60. "metadata": {
  61. "labels": {"component": "curator", "deploymentconfig": "logging-curator"},
  62. "name": "logging-curator-1",
  63. },
  64. "status": {
  65. "containerStatuses": [{"ready": True}],
  66. "conditions": [{"status": "True", "type": "Ready"}],
  67. "podIP": "10.10.10.10",
  68. }
  69. }
  70. @pytest.mark.parametrize('problem, expect', [
  71. ("[Errno 2] No such file or directory", "supposed to be a master"),
  72. ("Permission denied", "Unexpected error using `oc`"),
  73. ])
  74. def test_oc_failure(problem, expect):
  75. def execute_module(module_name, *_):
  76. if module_name == "ocutil":
  77. return dict(failed=True, result=problem)
  78. return dict(changed=False)
  79. check = LoggingCheck(execute_module, task_vars_config_base)
  80. with pytest.raises(CouldNotUseOc) as excinfo:
  81. check.exec_oc('get foo', [])
  82. assert expect in str(excinfo)
  83. groups_with_first_master = dict(oo_first_master=['this-host'])
  84. groups_not_a_master = dict(oo_first_master=['other-host'], oo_masters=['other-host'])
  85. @pytest.mark.parametrize('groups, logging_deployed, is_active', [
  86. (groups_with_first_master, True, True),
  87. (groups_with_first_master, False, False),
  88. (groups_not_a_master, True, False),
  89. (groups_not_a_master, True, False),
  90. ])
  91. def test_is_active(groups, logging_deployed, is_active):
  92. task_vars = dict(
  93. ansible_host='this-host',
  94. groups=groups,
  95. openshift_hosted_logging_deploy=logging_deployed,
  96. )
  97. assert LoggingCheck(None, task_vars).is_active() == is_active
  98. @pytest.mark.parametrize('pod_output, expect_pods', [
  99. (
  100. json.dumps({'items': [plain_es_pod]}),
  101. [plain_es_pod],
  102. ),
  103. ])
  104. def test_get_pods_for_component(pod_output, expect_pods):
  105. check = canned_loggingcheck(lambda *_: pod_output)
  106. pods = check.get_pods_for_component("es")
  107. assert pods == expect_pods
  108. @pytest.mark.parametrize('exec_oc_output, expect_error', [
  109. (
  110. 'No resources found.',
  111. MissingComponentPods,
  112. ),
  113. (
  114. '{"items": null}',
  115. MissingComponentPods,
  116. ),
  117. ])
  118. def test_get_pods_for_component_fail(exec_oc_output, expect_error):
  119. check = canned_loggingcheck(lambda *_: exec_oc_output)
  120. with pytest.raises(expect_error):
  121. check.get_pods_for_component("es")
  122. @pytest.mark.parametrize('name, pods, expected_pods', [
  123. (
  124. 'test single pod found, scheduled, but no containerStatuses field',
  125. [plain_kibana_pod_no_containerstatus],
  126. [plain_kibana_pod_no_containerstatus],
  127. ),
  128. (
  129. 'set of pods has at least one pod with containerStatuses (scheduled); should still fail',
  130. [plain_kibana_pod_no_containerstatus, plain_kibana_pod],
  131. [plain_kibana_pod_no_containerstatus],
  132. ),
  133. ], ids=lambda argvals: argvals[0])
  134. def test_get_not_running_pods_no_container_status(name, pods, expected_pods):
  135. check = canned_loggingcheck(lambda *_: '')
  136. result = check.not_running_pods(pods)
  137. assert result == expected_pods