logging_check_test.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. import pytest
  2. import json
  3. from openshift_checks.logging.logging import LoggingCheck, OpenShiftCheckException
  4. task_vars_config_base = dict(openshift=dict(common=dict(config_base='/etc/origin')))
  5. logging_namespace = "logging"
  6. def canned_loggingcheck(exec_oc=None):
  7. """Create a LoggingCheck object with canned exec_oc method"""
  8. check = LoggingCheck("dummy") # fails if a module is actually invoked
  9. check.logging_namespace = 'logging'
  10. if exec_oc:
  11. check.exec_oc = exec_oc
  12. return check
  13. def assert_error(error, expect_error):
  14. if expect_error:
  15. assert error
  16. assert expect_error in error
  17. else:
  18. assert not error
  19. plain_es_pod = {
  20. "metadata": {
  21. "labels": {"component": "es", "deploymentconfig": "logging-es"},
  22. "name": "logging-es",
  23. },
  24. "status": {
  25. "conditions": [{"status": "True", "type": "Ready"}],
  26. "containerStatuses": [{"ready": True}],
  27. "podIP": "10.10.10.10",
  28. },
  29. "_test_master_name_str": "name logging-es",
  30. }
  31. plain_kibana_pod = {
  32. "metadata": {
  33. "labels": {"component": "kibana", "deploymentconfig": "logging-kibana"},
  34. "name": "logging-kibana-1",
  35. },
  36. "status": {
  37. "containerStatuses": [{"ready": True}, {"ready": True}],
  38. "conditions": [{"status": "True", "type": "Ready"}],
  39. }
  40. }
  41. fluentd_pod_node1 = {
  42. "metadata": {
  43. "labels": {"component": "fluentd", "deploymentconfig": "logging-fluentd"},
  44. "name": "logging-fluentd-1",
  45. },
  46. "spec": {"host": "node1", "nodeName": "node1"},
  47. "status": {
  48. "containerStatuses": [{"ready": True}],
  49. "conditions": [{"status": "True", "type": "Ready"}],
  50. }
  51. }
  52. plain_curator_pod = {
  53. "metadata": {
  54. "labels": {"component": "curator", "deploymentconfig": "logging-curator"},
  55. "name": "logging-curator-1",
  56. },
  57. "status": {
  58. "containerStatuses": [{"ready": True}],
  59. "conditions": [{"status": "True", "type": "Ready"}],
  60. "podIP": "10.10.10.10",
  61. }
  62. }
  63. @pytest.mark.parametrize('problem, expect', [
  64. ("[Errno 2] No such file or directory", "supposed to be a master"),
  65. ("Permission denied", "Unexpected error using `oc`"),
  66. ])
  67. def test_oc_failure(problem, expect):
  68. def execute_module(module_name, args, task_vars):
  69. if module_name == "ocutil":
  70. return dict(failed=True, result=problem)
  71. return dict(changed=False)
  72. check = LoggingCheck({})
  73. with pytest.raises(OpenShiftCheckException) as excinfo:
  74. check.exec_oc(execute_module, logging_namespace, 'get foo', [], task_vars=task_vars_config_base)
  75. assert expect in str(excinfo)
  76. groups_with_first_master = dict(masters=['this-host', 'other-host'])
  77. groups_with_second_master = dict(masters=['other-host', 'this-host'])
  78. groups_not_a_master = dict(masters=['other-host'])
  79. @pytest.mark.parametrize('groups, logging_deployed, is_active', [
  80. (groups_with_first_master, True, True),
  81. (groups_with_first_master, False, False),
  82. (groups_not_a_master, True, False),
  83. (groups_with_second_master, True, False),
  84. (groups_not_a_master, True, False),
  85. ])
  86. def test_is_active(groups, logging_deployed, is_active):
  87. task_vars = dict(
  88. ansible_ssh_host='this-host',
  89. groups=groups,
  90. openshift_hosted_logging_deploy=logging_deployed,
  91. )
  92. assert LoggingCheck.is_active(task_vars=task_vars) == is_active
  93. @pytest.mark.parametrize('pod_output, expect_pods, expect_error', [
  94. (
  95. 'No resources found.',
  96. None,
  97. 'There are no pods in the logging namespace',
  98. ),
  99. (
  100. json.dumps({'items': [plain_kibana_pod, plain_es_pod, plain_curator_pod, fluentd_pod_node1]}),
  101. [plain_es_pod],
  102. None,
  103. ),
  104. ])
  105. def test_get_pods_for_component(pod_output, expect_pods, expect_error):
  106. check = canned_loggingcheck(lambda exec_module, namespace, cmd, args, task_vars: pod_output)
  107. pods, error = check.get_pods_for_component(
  108. lambda name, args, task_vars: {},
  109. logging_namespace,
  110. "es",
  111. {}
  112. )
  113. assert_error(error, expect_error)