etcd_traffic_test.py 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. import pytest
  2. from openshift_checks.etcd_traffic import EtcdTraffic
  3. @pytest.mark.parametrize('group_names,version,is_active', [
  4. (['oo_masters_to_config'], "3.5", False),
  5. (['oo_masters_to_config'], "3.6", False),
  6. (['oo_nodes_to_config'], "3.4", False),
  7. (['oo_etcd_to_config'], "3.4", True),
  8. (['oo_etcd_to_config'], "1.5", True),
  9. (['oo_etcd_to_config'], "3.1", False),
  10. (['oo_masters_to_config', 'oo_nodes_to_config'], "3.5", False),
  11. (['oo_masters_to_config', 'oo_etcd_to_config'], "3.5", True),
  12. ([], "3.4", False),
  13. ])
  14. def test_is_active(group_names, version, is_active):
  15. task_vars = dict(
  16. group_names=group_names,
  17. openshift_image_tag=version,
  18. )
  19. assert EtcdTraffic(task_vars=task_vars).is_active() == is_active
  20. @pytest.mark.parametrize('group_names,matched,failed,extra_words', [
  21. (["oo_masters_to_config"], True, True, ["Higher than normal", "traffic"]),
  22. (["oo_masters_to_config", "oo_etcd_to_config"], False, False, []),
  23. (["oo_etcd_to_config"], False, False, []),
  24. ])
  25. def test_log_matches_high_traffic_msg(group_names, matched, failed, extra_words):
  26. def execute_module(module_name, *_):
  27. return {
  28. "matched": matched,
  29. "failed": failed,
  30. }
  31. task_vars = dict(
  32. group_names=group_names,
  33. openshift=dict(
  34. common=dict(is_containerized=False),
  35. ),
  36. openshift_service_type="origin"
  37. )
  38. result = EtcdTraffic(execute_module, task_vars).run()
  39. for word in extra_words:
  40. assert word in result.get("msg", "")
  41. assert result.get("failed", False) == failed
  42. @pytest.mark.parametrize('is_containerized,expected_unit_value', [
  43. (False, "etcd"),
  44. (True, "etcd_container"),
  45. ])
  46. def test_systemd_unit_matches_deployment_type(is_containerized, expected_unit_value):
  47. task_vars = dict(
  48. openshift=dict(
  49. common=dict(is_containerized=is_containerized),
  50. )
  51. )
  52. def execute_module(module_name, args, *_):
  53. assert module_name == "search_journalctl"
  54. matchers = args["log_matchers"]
  55. for matcher in matchers:
  56. assert matcher["unit"] == expected_unit_value
  57. return {"failed": False}
  58. EtcdTraffic(execute_module, task_vars).run()