disk_availability_test.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. import pytest
  2. from openshift_checks.disk_availability import DiskAvailability, OpenShiftCheckException
  3. @pytest.mark.parametrize('group_names,is_containerized,is_active', [
  4. (['masters'], False, True),
  5. # ensure check is skipped on containerized installs
  6. (['masters'], True, False),
  7. (['nodes'], False, True),
  8. (['etcd'], False, True),
  9. (['masters', 'nodes'], False, True),
  10. (['masters', 'etcd'], False, True),
  11. ([], False, False),
  12. (['lb'], False, False),
  13. (['nfs'], False, False),
  14. ])
  15. def test_is_active(group_names, is_containerized, is_active):
  16. task_vars = dict(
  17. group_names=group_names,
  18. openshift=dict(common=dict(is_containerized=is_containerized)),
  19. )
  20. assert DiskAvailability.is_active(task_vars=task_vars) == is_active
  21. @pytest.mark.parametrize('ansible_mounts,extra_words', [
  22. ([], ['none']), # empty ansible_mounts
  23. ([{'mount': '/mnt'}], ['/mnt']), # missing relevant mount paths
  24. ([{'mount': '/var'}], ['/var']), # missing size_available
  25. ])
  26. def test_cannot_determine_available_disk(ansible_mounts, extra_words):
  27. task_vars = dict(
  28. group_names=['masters'],
  29. ansible_mounts=ansible_mounts,
  30. )
  31. check = DiskAvailability(execute_module=fake_execute_module)
  32. with pytest.raises(OpenShiftCheckException) as excinfo:
  33. check.run(tmp=None, task_vars=task_vars)
  34. for word in 'determine available disk'.split() + extra_words:
  35. assert word in str(excinfo.value)
  36. @pytest.mark.parametrize('group_names,ansible_mounts', [
  37. (
  38. ['masters'],
  39. [{
  40. 'mount': '/',
  41. 'size_available': 40 * 10**9 + 1,
  42. }],
  43. ),
  44. (
  45. ['nodes'],
  46. [{
  47. 'mount': '/',
  48. 'size_available': 15 * 10**9 + 1,
  49. }],
  50. ),
  51. (
  52. ['etcd'],
  53. [{
  54. 'mount': '/',
  55. 'size_available': 20 * 10**9 + 1,
  56. }],
  57. ),
  58. (
  59. ['etcd'],
  60. [{
  61. # not enough space on / ...
  62. 'mount': '/',
  63. 'size_available': 0,
  64. }, {
  65. # ... but enough on /var
  66. 'mount': '/var',
  67. 'size_available': 20 * 10**9 + 1,
  68. }],
  69. ),
  70. ])
  71. def test_succeeds_with_recommended_disk_space(group_names, ansible_mounts):
  72. task_vars = dict(
  73. group_names=group_names,
  74. ansible_mounts=ansible_mounts,
  75. )
  76. check = DiskAvailability(execute_module=fake_execute_module)
  77. result = check.run(tmp=None, task_vars=task_vars)
  78. assert not result.get('failed', False)
  79. @pytest.mark.parametrize('group_names,ansible_mounts,extra_words', [
  80. (
  81. ['masters'],
  82. [{
  83. 'mount': '/',
  84. 'size_available': 1,
  85. }],
  86. ['0.0 GB'],
  87. ),
  88. (
  89. ['nodes'],
  90. [{
  91. 'mount': '/',
  92. 'size_available': 1 * 10**9,
  93. }],
  94. ['1.0 GB'],
  95. ),
  96. (
  97. ['etcd'],
  98. [{
  99. 'mount': '/',
  100. 'size_available': 1,
  101. }],
  102. ['0.0 GB'],
  103. ),
  104. (
  105. ['nodes', 'masters'],
  106. [{
  107. 'mount': '/',
  108. # enough space for a node, not enough for a master
  109. 'size_available': 15 * 10**9 + 1,
  110. }],
  111. ['15.0 GB'],
  112. ),
  113. (
  114. ['etcd'],
  115. [{
  116. # enough space on / ...
  117. 'mount': '/',
  118. 'size_available': 20 * 10**9 + 1,
  119. }, {
  120. # .. but not enough on /var
  121. 'mount': '/var',
  122. 'size_available': 0,
  123. }],
  124. ['0.0 GB'],
  125. ),
  126. ])
  127. def test_fails_with_insufficient_disk_space(group_names, ansible_mounts, extra_words):
  128. task_vars = dict(
  129. group_names=group_names,
  130. ansible_mounts=ansible_mounts,
  131. )
  132. check = DiskAvailability(execute_module=fake_execute_module)
  133. result = check.run(tmp=None, task_vars=task_vars)
  134. assert result['failed']
  135. for word in 'below recommended'.split() + extra_words:
  136. assert word in result['msg']
  137. def fake_execute_module(*args):
  138. raise AssertionError('this function should not be called')