etcd_traffic_test.py 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. import pytest
  2. from openshift_checks.etcd_traffic import EtcdTraffic
  3. @pytest.mark.parametrize('group_names,version,is_active', [
  4. (['oo_masters_to_config'], "3.5", False),
  5. (['oo_masters_to_config'], "3.6", False),
  6. (['oo_nodes_to_config'], "3.4", False),
  7. (['oo_etcd_to_config'], "3.4", True),
  8. (['oo_etcd_to_config'], "1.5", True),
  9. (['oo_etcd_to_config'], "3.1", False),
  10. (['oo_masters_to_config', 'oo_nodes_to_config'], "3.5", False),
  11. (['oo_masters_to_config', 'oo_etcd_to_config'], "3.5", True),
  12. ([], "3.4", False),
  13. ])
  14. def test_is_active(group_names, version, is_active):
  15. task_vars = dict(
  16. group_names=group_names,
  17. openshift_image_tag=version,
  18. )
  19. assert EtcdTraffic(task_vars=task_vars).is_active() == is_active
  20. @pytest.mark.parametrize('group_names,matched,failed,extra_words', [
  21. (["oo_masters_to_config"], True, True, ["Higher than normal", "traffic"]),
  22. (["oo_masters_to_config", "oo_etcd_to_config"], False, False, []),
  23. (["oo_etcd_to_config"], False, False, []),
  24. ])
  25. def test_log_matches_high_traffic_msg(group_names, matched, failed, extra_words):
  26. def execute_module(module_name, *_):
  27. return {
  28. "matched": matched,
  29. "failed": failed,
  30. }
  31. task_vars = dict(
  32. group_names=group_names,
  33. openshift=dict(
  34. common=dict(service_type="origin", is_containerized=False),
  35. )
  36. )
  37. result = EtcdTraffic(execute_module, task_vars).run()
  38. for word in extra_words:
  39. assert word in result.get("msg", "")
  40. assert result.get("failed", False) == failed
  41. @pytest.mark.parametrize('is_containerized,expected_unit_value', [
  42. (False, "etcd"),
  43. (True, "etcd_container"),
  44. ])
  45. def test_systemd_unit_matches_deployment_type(is_containerized, expected_unit_value):
  46. task_vars = dict(
  47. openshift=dict(
  48. common=dict(is_containerized=is_containerized),
  49. )
  50. )
  51. def execute_module(module_name, args, *_):
  52. assert module_name == "search_journalctl"
  53. matchers = args["log_matchers"]
  54. for matcher in matchers:
  55. assert matcher["unit"] == expected_unit_value
  56. return {"failed": False}
  57. EtcdTraffic(execute_module, task_vars).run()