logging_index_time_test.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. import json
  2. import pytest
  3. from openshift_checks.logging.logging_index_time import LoggingIndexTime, OpenShiftCheckException
  4. SAMPLE_UUID = "unique-test-uuid"
  5. def canned_loggingindextime(exec_oc=None):
  6. """Create a check object with a canned exec_oc method"""
  7. check = LoggingIndexTime("dummy") # fails if a module is actually invoked
  8. if exec_oc:
  9. check.exec_oc = exec_oc
  10. return check
  11. plain_running_elasticsearch_pod = {
  12. "metadata": {
  13. "labels": {"component": "es", "deploymentconfig": "logging-es-data-master"},
  14. "name": "logging-es-data-master-1",
  15. },
  16. "status": {
  17. "containerStatuses": [{"ready": True}, {"ready": True}],
  18. "phase": "Running",
  19. }
  20. }
  21. plain_running_kibana_pod = {
  22. "metadata": {
  23. "labels": {"component": "kibana", "deploymentconfig": "logging-kibana"},
  24. "name": "logging-kibana-1",
  25. },
  26. "status": {
  27. "containerStatuses": [{"ready": True}, {"ready": True}],
  28. "phase": "Running",
  29. }
  30. }
  31. not_running_kibana_pod = {
  32. "metadata": {
  33. "labels": {"component": "kibana", "deploymentconfig": "logging-kibana"},
  34. "name": "logging-kibana-2",
  35. },
  36. "status": {
  37. "containerStatuses": [{"ready": True}, {"ready": False}],
  38. "conditions": [{"status": "True", "type": "Ready"}],
  39. "phase": "pending",
  40. }
  41. }
  42. @pytest.mark.parametrize('pods, expect_pods', [
  43. (
  44. [not_running_kibana_pod],
  45. [],
  46. ),
  47. (
  48. [plain_running_kibana_pod],
  49. [plain_running_kibana_pod],
  50. ),
  51. (
  52. [],
  53. [],
  54. )
  55. ])
  56. def test_check_running_pods(pods, expect_pods):
  57. check = canned_loggingindextime(None)
  58. pods = check.running_pods(pods)
  59. assert pods == expect_pods
  60. @pytest.mark.parametrize('name, json_response, uuid, timeout, extra_words', [
  61. (
  62. 'valid count in response',
  63. {
  64. "count": 1,
  65. },
  66. SAMPLE_UUID,
  67. 0.001,
  68. [],
  69. ),
  70. ], ids=lambda argval: argval[0])
  71. def test_wait_until_cmd_or_err_succeeds(name, json_response, uuid, timeout, extra_words):
  72. def exec_oc(execute_module, ns, exec_cmd, args, task_vars):
  73. return json.dumps(json_response)
  74. check = canned_loggingindextime(exec_oc)
  75. check.wait_until_cmd_or_err(plain_running_elasticsearch_pod, uuid, timeout, None)
  76. @pytest.mark.parametrize('name, json_response, uuid, timeout, extra_words', [
  77. (
  78. 'invalid json response',
  79. {
  80. "invalid_field": 1,
  81. },
  82. SAMPLE_UUID,
  83. 0.001,
  84. ["invalid response", "Elasticsearch"],
  85. ),
  86. (
  87. 'empty response',
  88. {},
  89. SAMPLE_UUID,
  90. 0.001,
  91. ["invalid response", "Elasticsearch"],
  92. ),
  93. (
  94. 'valid response but invalid match count',
  95. {
  96. "count": 0,
  97. },
  98. SAMPLE_UUID,
  99. 0.005,
  100. ["expecting match", SAMPLE_UUID, "0.005s"],
  101. )
  102. ], ids=lambda argval: argval[0])
  103. def test_wait_until_cmd_or_err(name, json_response, uuid, timeout, extra_words):
  104. def exec_oc(execute_module, ns, exec_cmd, args, task_vars):
  105. return json.dumps(json_response)
  106. check = canned_loggingindextime(exec_oc)
  107. with pytest.raises(OpenShiftCheckException) as error:
  108. check.wait_until_cmd_or_err(plain_running_elasticsearch_pod, uuid, timeout, None)
  109. for word in extra_words:
  110. assert word in str(error)
  111. @pytest.mark.parametrize('name, json_response, uuid, extra_words', [
  112. (
  113. 'correct response code, found unique id is returned',
  114. {
  115. "statusCode": 404,
  116. },
  117. "sample unique id",
  118. ["sample unique id"],
  119. ),
  120. ], ids=lambda argval: argval[0])
  121. def test_curl_kibana_with_uuid(name, json_response, uuid, extra_words):
  122. def exec_oc(execute_module, ns, exec_cmd, args, task_vars):
  123. return json.dumps(json_response)
  124. check = canned_loggingindextime(exec_oc)
  125. check.generate_uuid = lambda: uuid
  126. result = check.curl_kibana_with_uuid(plain_running_kibana_pod, None)
  127. for word in extra_words:
  128. assert word in result
  129. @pytest.mark.parametrize('name, json_response, uuid, extra_words', [
  130. (
  131. 'invalid json response',
  132. {
  133. "invalid_field": "invalid",
  134. },
  135. SAMPLE_UUID,
  136. ["invalid response returned", 'Missing "statusCode" key'],
  137. ),
  138. (
  139. 'wrong error code in response',
  140. {
  141. "statusCode": 500,
  142. },
  143. SAMPLE_UUID,
  144. ["Expecting error code", "500"],
  145. ),
  146. ], ids=lambda argval: argval[0])
  147. def test_failed_curl_kibana_with_uuid(name, json_response, uuid, extra_words):
  148. def exec_oc(execute_module, ns, exec_cmd, args, task_vars):
  149. return json.dumps(json_response)
  150. check = canned_loggingindextime(exec_oc)
  151. check.generate_uuid = lambda: uuid
  152. with pytest.raises(OpenShiftCheckException) as error:
  153. check.curl_kibana_with_uuid(plain_running_kibana_pod, None)
  154. for word in extra_words:
  155. assert word in str(error)