logging_check_test.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. import pytest
  2. import json
  3. from openshift_checks.logging.logging import LoggingCheck, OpenShiftCheckException
  4. task_vars_config_base = dict(openshift=dict(common=dict(config_base='/etc/origin')))
  5. logging_namespace = "logging"
  6. def canned_loggingcheck(exec_oc=None):
  7. """Create a LoggingCheck object with canned exec_oc method"""
  8. check = LoggingCheck() # fails if a module is actually invoked
  9. check.logging_namespace = 'logging'
  10. if exec_oc:
  11. check.exec_oc = exec_oc
  12. return check
  13. def assert_error(error, expect_error):
  14. if expect_error:
  15. assert error
  16. assert expect_error in error
  17. else:
  18. assert not error
  19. plain_es_pod = {
  20. "metadata": {
  21. "labels": {"component": "es", "deploymentconfig": "logging-es"},
  22. "name": "logging-es",
  23. },
  24. "status": {
  25. "conditions": [{"status": "True", "type": "Ready"}],
  26. "containerStatuses": [{"ready": True}],
  27. "podIP": "10.10.10.10",
  28. },
  29. "_test_master_name_str": "name logging-es",
  30. }
  31. plain_kibana_pod = {
  32. "metadata": {
  33. "labels": {"component": "kibana", "deploymentconfig": "logging-kibana"},
  34. "name": "logging-kibana-1",
  35. },
  36. "status": {
  37. "containerStatuses": [{"ready": True}, {"ready": True}],
  38. "conditions": [{"status": "True", "type": "Ready"}],
  39. }
  40. }
  41. plain_kibana_pod_no_containerstatus = {
  42. "metadata": {
  43. "labels": {"component": "kibana", "deploymentconfig": "logging-kibana"},
  44. "name": "logging-kibana-1",
  45. },
  46. "status": {
  47. "conditions": [{"status": "True", "type": "Ready"}],
  48. }
  49. }
  50. fluentd_pod_node1 = {
  51. "metadata": {
  52. "labels": {"component": "fluentd", "deploymentconfig": "logging-fluentd"},
  53. "name": "logging-fluentd-1",
  54. },
  55. "spec": {"host": "node1", "nodeName": "node1"},
  56. "status": {
  57. "containerStatuses": [{"ready": True}],
  58. "conditions": [{"status": "True", "type": "Ready"}],
  59. }
  60. }
  61. plain_curator_pod = {
  62. "metadata": {
  63. "labels": {"component": "curator", "deploymentconfig": "logging-curator"},
  64. "name": "logging-curator-1",
  65. },
  66. "status": {
  67. "containerStatuses": [{"ready": True}],
  68. "conditions": [{"status": "True", "type": "Ready"}],
  69. "podIP": "10.10.10.10",
  70. }
  71. }
  72. @pytest.mark.parametrize('problem, expect', [
  73. ("[Errno 2] No such file or directory", "supposed to be a master"),
  74. ("Permission denied", "Unexpected error using `oc`"),
  75. ])
  76. def test_oc_failure(problem, expect):
  77. def execute_module(module_name, *_):
  78. if module_name == "ocutil":
  79. return dict(failed=True, result=problem)
  80. return dict(changed=False)
  81. check = LoggingCheck(execute_module, task_vars_config_base)
  82. with pytest.raises(OpenShiftCheckException) as excinfo:
  83. check.exec_oc(logging_namespace, 'get foo', [])
  84. assert expect in str(excinfo)
  85. groups_with_first_master = dict(masters=['this-host', 'other-host'])
  86. groups_with_second_master = dict(masters=['other-host', 'this-host'])
  87. groups_not_a_master = dict(masters=['other-host'])
  88. @pytest.mark.parametrize('groups, logging_deployed, is_active', [
  89. (groups_with_first_master, True, True),
  90. (groups_with_first_master, False, False),
  91. (groups_not_a_master, True, False),
  92. (groups_with_second_master, True, False),
  93. (groups_not_a_master, True, False),
  94. ])
  95. def test_is_active(groups, logging_deployed, is_active):
  96. task_vars = dict(
  97. ansible_ssh_host='this-host',
  98. groups=groups,
  99. openshift_hosted_logging_deploy=logging_deployed,
  100. )
  101. assert LoggingCheck(None, task_vars).is_active() == is_active
  102. @pytest.mark.parametrize('pod_output, expect_pods, expect_error', [
  103. (
  104. 'No resources found.',
  105. None,
  106. 'No pods were found for the "es"',
  107. ),
  108. (
  109. json.dumps({'items': [plain_kibana_pod, plain_es_pod, plain_curator_pod, fluentd_pod_node1]}),
  110. [plain_es_pod],
  111. None,
  112. ),
  113. ])
  114. def test_get_pods_for_component(pod_output, expect_pods, expect_error):
  115. check = canned_loggingcheck(lambda namespace, cmd, args: pod_output)
  116. pods, error = check.get_pods_for_component(
  117. logging_namespace,
  118. "es",
  119. )
  120. assert_error(error, expect_error)
  121. @pytest.mark.parametrize('name, pods, expected_pods', [
  122. (
  123. 'test single pod found, scheduled, but no containerStatuses field',
  124. [plain_kibana_pod_no_containerstatus],
  125. [plain_kibana_pod_no_containerstatus],
  126. ),
  127. (
  128. 'set of pods has at least one pod with containerStatuses (scheduled); should still fail',
  129. [plain_kibana_pod_no_containerstatus, plain_kibana_pod],
  130. [plain_kibana_pod_no_containerstatus],
  131. ),
  132. ], ids=lambda argvals: argvals[0])
  133. def test_get_not_running_pods_no_container_status(name, pods, expected_pods):
  134. check = canned_loggingcheck(lambda exec_module, namespace, cmd, args, task_vars: '')
  135. result = check.not_running_pods(pods)
  136. assert result == expected_pods