123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224 |
- import pytest
- from openshift_checks import OpenShiftCheckException
- from openshift_checks.docker_storage import DockerStorage
- def dummy_check(execute_module=None):
- def dummy_exec(self, status, task_vars):
- raise Exception("dummy executor called")
- return DockerStorage(execute_module=execute_module or dummy_exec)
- @pytest.mark.parametrize('is_containerized, group_names, is_active', [
- (False, ["masters", "etcd"], False),
- (False, ["masters", "nodes"], True),
- (True, ["etcd"], True),
- ])
- def test_is_active(is_containerized, group_names, is_active):
- task_vars = dict(
- openshift=dict(common=dict(is_containerized=is_containerized)),
- group_names=group_names,
- )
- assert DockerStorage.is_active(task_vars=task_vars) == is_active
- non_atomic_task_vars = {"openshift": {"common": {"is_atomic": False}}}
- @pytest.mark.parametrize('docker_info, failed, expect_msg', [
- (
- dict(failed=True, msg="Error connecting: Error while fetching server API version"),
- True,
- ["Is docker running on this host?"],
- ),
- (
- dict(msg="I have no info"),
- True,
- ["missing info"],
- ),
- (
- dict(info={
- "Driver": "devicemapper",
- "DriverStatus": [("Pool Name", "docker-docker--pool")],
- }),
- False,
- [],
- ),
- (
- dict(info={
- "Driver": "devicemapper",
- "DriverStatus": [("Data loop file", "true")],
- }),
- True,
- ["loopback devices with the Docker devicemapper storage driver"],
- ),
- (
- dict(info={
- "Driver": "overlay2",
- "DriverStatus": []
- }),
- False,
- [],
- ),
- (
- dict(info={
- "Driver": "overlay",
- }),
- True,
- ["unsupported Docker storage driver"],
- ),
- (
- dict(info={
- "Driver": "unsupported",
- }),
- True,
- ["unsupported Docker storage driver"],
- ),
- ])
- def test_check_storage_driver(docker_info, failed, expect_msg):
- def execute_module(module_name, module_args, tmp=None, task_vars=None):
- if module_name == "yum":
- return {}
- if module_name != "docker_info":
- raise ValueError("not expecting module " + module_name)
- return docker_info
- check = dummy_check(execute_module=execute_module)
- check._check_dm_usage = lambda status, task_vars: dict() # stub out for this test
- result = check.run(tmp=None, task_vars=non_atomic_task_vars)
- if failed:
- assert result["failed"]
- else:
- assert not result.get("failed", False)
- for word in expect_msg:
- assert word in result["msg"]
- enough_space = {
- "Pool Name": "docker--vg-docker--pool",
- "Data Space Used": "19.92 MB",
- "Data Space Total": "8.535 GB",
- "Metadata Space Used": "40.96 kB",
- "Metadata Space Total": "25.17 MB",
- }
- not_enough_space = {
- "Pool Name": "docker--vg-docker--pool",
- "Data Space Used": "10 GB",
- "Data Space Total": "10 GB",
- "Metadata Space Used": "42 kB",
- "Metadata Space Total": "43 kB",
- }
- @pytest.mark.parametrize('task_vars, driver_status, vg_free, success, expect_msg', [
- (
- {"max_thinpool_data_usage_percent": "not a float"},
- enough_space,
- "12g",
- False,
- ["is not a percentage"],
- ),
- (
- {},
- {}, # empty values from driver status
- "bogus", # also does not parse as bytes
- False,
- ["Could not interpret", "as bytes"],
- ),
- (
- {},
- enough_space,
- "12.00g",
- True,
- [],
- ),
- (
- {},
- not_enough_space,
- "0.00",
- False,
- ["data usage", "metadata usage", "higher than threshold"],
- ),
- ])
- def test_dm_usage(task_vars, driver_status, vg_free, success, expect_msg):
- check = dummy_check()
- check._get_vg_free = lambda pool, task_vars: vg_free
- result = check._check_dm_usage(driver_status, task_vars)
- result_success = not result.get("failed")
- assert result_success is success
- for msg in expect_msg:
- assert msg in result["msg"]
- @pytest.mark.parametrize('pool, command_returns, raises, returns', [
- (
- "foo-bar",
- { # vgs missing
- "msg": "[Errno 2] No such file or directory",
- "failed": True,
- "cmd": "/sbin/vgs",
- "rc": 2,
- },
- "Failed to run /sbin/vgs",
- None,
- ),
- (
- "foo", # no hyphen in name - should not happen
- {},
- "name does not have the expected format",
- None,
- ),
- (
- "foo-bar",
- dict(stdout=" 4.00g\n"),
- None,
- "4.00g",
- ),
- (
- "foo-bar",
- dict(stdout="\n"), # no matching VG
- "vgs did not find this VG",
- None,
- )
- ])
- def test_vg_free(pool, command_returns, raises, returns):
- def execute_module(module_name, module_args, tmp=None, task_vars=None):
- if module_name != "command":
- raise ValueError("not expecting module " + module_name)
- return command_returns
- check = dummy_check(execute_module=execute_module)
- if raises:
- with pytest.raises(OpenShiftCheckException) as err:
- check._get_vg_free(pool, {})
- assert raises in str(err.value)
- else:
- ret = check._get_vg_free(pool, {})
- assert ret == returns
- @pytest.mark.parametrize('string, expect_bytes', [
- ("12", 12.0),
- ("12 k", 12.0 * 1024),
- ("42.42 MB", 42.42 * 1024**2),
- ("12g", 12.0 * 1024**3),
- ])
- def test_convert_to_bytes(string, expect_bytes):
- got = DockerStorage._convert_to_bytes(string)
- assert got == expect_bytes
- @pytest.mark.parametrize('string', [
- "bork",
- "42 Qs",
- ])
- def test_convert_to_bytes_error(string):
- with pytest.raises(ValueError) as err:
- DockerStorage._convert_to_bytes(string)
- assert "Cannot convert" in str(err.value)
- assert string in str(err.value)
|