logging_check_test.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. import pytest
  2. import json
  3. from openshift_checks.logging.logging import LoggingCheck, MissingComponentPods, CouldNotUseOc
  4. task_vars_config_base = dict(openshift=dict(common=dict(config_base='/etc/origin')))
  5. def canned_loggingcheck(exec_oc=None, execute_module=None):
  6. """Create a LoggingCheck object with canned exec_oc method"""
  7. check = LoggingCheck(execute_module)
  8. if exec_oc:
  9. check.exec_oc = exec_oc
  10. return check
  11. def assert_error(error, expect_error):
  12. if expect_error:
  13. assert error
  14. assert expect_error in error
  15. else:
  16. assert not error
  17. plain_es_pod = {
  18. "metadata": {
  19. "labels": {"component": "es", "deploymentconfig": "logging-es"},
  20. "name": "logging-es",
  21. },
  22. "status": {
  23. "conditions": [{"status": "True", "type": "Ready"}],
  24. "containerStatuses": [{"ready": True}],
  25. "podIP": "10.10.10.10",
  26. },
  27. "_test_master_name_str": "name logging-es",
  28. }
  29. plain_kibana_pod = {
  30. "metadata": {
  31. "labels": {"component": "kibana", "deploymentconfig": "logging-kibana"},
  32. "name": "logging-kibana-1",
  33. },
  34. "status": {
  35. "containerStatuses": [{"ready": True}, {"ready": True}],
  36. "conditions": [{"status": "True", "type": "Ready"}],
  37. }
  38. }
  39. plain_kibana_pod_no_containerstatus = {
  40. "metadata": {
  41. "labels": {"component": "kibana", "deploymentconfig": "logging-kibana"},
  42. "name": "logging-kibana-1",
  43. },
  44. "status": {
  45. "conditions": [{"status": "True", "type": "Ready"}],
  46. }
  47. }
  48. fluentd_pod_node1 = {
  49. "metadata": {
  50. "labels": {"component": "fluentd", "deploymentconfig": "logging-fluentd"},
  51. "name": "logging-fluentd-1",
  52. },
  53. "spec": {"host": "node1", "nodeName": "node1"},
  54. "status": {
  55. "containerStatuses": [{"ready": True}],
  56. "conditions": [{"status": "True", "type": "Ready"}],
  57. }
  58. }
  59. plain_curator_pod = {
  60. "metadata": {
  61. "labels": {"component": "curator", "deploymentconfig": "logging-curator"},
  62. "name": "logging-curator-1",
  63. },
  64. "status": {
  65. "containerStatuses": [{"ready": True}],
  66. "conditions": [{"status": "True", "type": "Ready"}],
  67. "podIP": "10.10.10.10",
  68. }
  69. }
  70. @pytest.mark.parametrize('problem, expect', [
  71. ("[Errno 2] No such file or directory", "supposed to be a master"),
  72. ("Permission denied", "Unexpected error using `oc`"),
  73. ])
  74. def test_oc_failure(problem, expect):
  75. def execute_module(module_name, *_):
  76. if module_name == "ocutil":
  77. return dict(failed=True, result=problem)
  78. return dict(changed=False)
  79. check = LoggingCheck(execute_module, task_vars_config_base)
  80. with pytest.raises(CouldNotUseOc) as excinfo:
  81. check.exec_oc('get foo', [])
  82. assert expect in str(excinfo)
  83. groups_with_first_master = dict(masters=['this-host', 'other-host'])
  84. groups_with_second_master = dict(masters=['other-host', 'this-host'])
  85. groups_not_a_master = dict(masters=['other-host'])
  86. @pytest.mark.parametrize('groups, logging_deployed, is_active', [
  87. (groups_with_first_master, True, True),
  88. (groups_with_first_master, False, False),
  89. (groups_not_a_master, True, False),
  90. (groups_with_second_master, True, False),
  91. (groups_not_a_master, True, False),
  92. ])
  93. def test_is_active(groups, logging_deployed, is_active):
  94. task_vars = dict(
  95. ansible_ssh_host='this-host',
  96. groups=groups,
  97. openshift_hosted_logging_deploy=logging_deployed,
  98. )
  99. assert LoggingCheck(None, task_vars).is_active() == is_active
  100. @pytest.mark.parametrize('pod_output, expect_pods', [
  101. (
  102. json.dumps({'items': [plain_es_pod]}),
  103. [plain_es_pod],
  104. ),
  105. ])
  106. def test_get_pods_for_component(pod_output, expect_pods):
  107. check = canned_loggingcheck(lambda *_: pod_output)
  108. pods = check.get_pods_for_component("es")
  109. assert pods == expect_pods
  110. @pytest.mark.parametrize('exec_oc_output, expect_error', [
  111. (
  112. 'No resources found.',
  113. MissingComponentPods,
  114. ),
  115. (
  116. '{"items": null}',
  117. MissingComponentPods,
  118. ),
  119. ])
  120. def test_get_pods_for_component_fail(exec_oc_output, expect_error):
  121. check = canned_loggingcheck(lambda *_: exec_oc_output)
  122. with pytest.raises(expect_error):
  123. check.get_pods_for_component("es")
  124. @pytest.mark.parametrize('name, pods, expected_pods', [
  125. (
  126. 'test single pod found, scheduled, but no containerStatuses field',
  127. [plain_kibana_pod_no_containerstatus],
  128. [plain_kibana_pod_no_containerstatus],
  129. ),
  130. (
  131. 'set of pods has at least one pod with containerStatuses (scheduled); should still fail',
  132. [plain_kibana_pod_no_containerstatus, plain_kibana_pod],
  133. [plain_kibana_pod_no_containerstatus],
  134. ),
  135. ], ids=lambda argvals: argvals[0])
  136. def test_get_not_running_pods_no_container_status(name, pods, expected_pods):
  137. check = canned_loggingcheck(lambda *_: '')
  138. result = check.not_running_pods(pods)
  139. assert result == expected_pods