docker_image_availability_test.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. import pytest
  2. from openshift_checks.docker_image_availability import DockerImageAvailability
  3. @pytest.mark.parametrize('deployment_type, is_containerized, group_names, expect_active', [
  4. ("origin", True, [], True),
  5. ("openshift-enterprise", True, [], True),
  6. ("enterprise", True, [], False),
  7. ("online", True, [], False),
  8. ("invalid", True, [], False),
  9. ("", True, [], False),
  10. ("origin", False, [], False),
  11. ("openshift-enterprise", False, [], False),
  12. ("origin", False, ["nodes", "masters"], True),
  13. ("openshift-enterprise", False, ["etcd"], False),
  14. ])
  15. def test_is_active(deployment_type, is_containerized, group_names, expect_active):
  16. task_vars = dict(
  17. openshift=dict(common=dict(is_containerized=is_containerized)),
  18. openshift_deployment_type=deployment_type,
  19. group_names=group_names,
  20. )
  21. assert DockerImageAvailability.is_active(task_vars=task_vars) == expect_active
  22. @pytest.mark.parametrize("is_containerized,is_atomic", [
  23. (True, True),
  24. (False, False),
  25. (True, False),
  26. (False, True),
  27. ])
  28. def test_all_images_available_locally(is_containerized, is_atomic):
  29. def execute_module(module_name, args, task_vars):
  30. if module_name == "yum":
  31. return {"changed": True}
  32. assert module_name == "docker_image_facts"
  33. assert 'name' in args
  34. assert args['name']
  35. return {
  36. 'images': [args['name']],
  37. }
  38. result = DockerImageAvailability(execute_module=execute_module).run(tmp=None, task_vars=dict(
  39. openshift=dict(
  40. common=dict(
  41. service_type='origin',
  42. is_containerized=is_containerized,
  43. is_atomic=is_atomic,
  44. ),
  45. docker=dict(additional_registries=["docker.io"]),
  46. ),
  47. openshift_deployment_type='origin',
  48. openshift_release='v3.4',
  49. openshift_image_tag='3.4',
  50. ))
  51. assert not result.get('failed', False)
  52. @pytest.mark.parametrize("available_locally", [
  53. False,
  54. True,
  55. ])
  56. def test_all_images_available_remotely(available_locally):
  57. def execute_module(module_name, args, task_vars):
  58. if module_name == 'docker_image_facts':
  59. return {'images': [], 'failed': available_locally}
  60. return {'changed': False}
  61. result = DockerImageAvailability(execute_module=execute_module).run(tmp=None, task_vars=dict(
  62. openshift=dict(
  63. common=dict(
  64. service_type='origin',
  65. is_containerized=False,
  66. is_atomic=False,
  67. ),
  68. docker=dict(additional_registries=["docker.io", "registry.access.redhat.com"]),
  69. ),
  70. openshift_deployment_type='origin',
  71. openshift_release='3.4',
  72. openshift_image_tag='v3.4',
  73. ))
  74. assert not result.get('failed', False)
  75. def test_all_images_unavailable():
  76. def execute_module(module_name=None, module_args=None, tmp=None, task_vars=None):
  77. if module_name == "command":
  78. return {
  79. 'failed': True,
  80. }
  81. return {
  82. 'changed': False,
  83. }
  84. check = DockerImageAvailability(execute_module=execute_module)
  85. actual = check.run(tmp=None, task_vars=dict(
  86. openshift=dict(
  87. common=dict(
  88. service_type='origin',
  89. is_containerized=False,
  90. is_atomic=False,
  91. ),
  92. docker=dict(additional_registries=["docker.io"]),
  93. ),
  94. openshift_deployment_type="openshift-enterprise",
  95. openshift_release=None,
  96. openshift_image_tag='latest'
  97. ))
  98. assert actual['failed']
  99. assert "required Docker images are not available" in actual['msg']
  100. @pytest.mark.parametrize("message,extra_words", [
  101. (
  102. "docker image update failure",
  103. ["docker image update failure"],
  104. ),
  105. (
  106. "No package matching 'skopeo' found available, installed or updated",
  107. ["dependencies can be installed via `yum`"]
  108. ),
  109. ])
  110. def test_skopeo_update_failure(message, extra_words):
  111. def execute_module(module_name=None, module_args=None, tmp=None, task_vars=None):
  112. if module_name == "yum":
  113. return {
  114. "failed": True,
  115. "msg": message,
  116. "changed": False,
  117. }
  118. return {'changed': False}
  119. actual = DockerImageAvailability(execute_module=execute_module).run(tmp=None, task_vars=dict(
  120. openshift=dict(
  121. common=dict(
  122. service_type='origin',
  123. is_containerized=False,
  124. is_atomic=False,
  125. ),
  126. docker=dict(additional_registries=["unknown.io"]),
  127. ),
  128. openshift_deployment_type="openshift-enterprise",
  129. openshift_release='',
  130. openshift_image_tag='',
  131. ))
  132. assert actual["failed"]
  133. for word in extra_words:
  134. assert word in actual["msg"]
  135. @pytest.mark.parametrize("deployment_type,registries", [
  136. ("origin", ["unknown.io"]),
  137. ("openshift-enterprise", ["registry.access.redhat.com"]),
  138. ("openshift-enterprise", []),
  139. ])
  140. def test_registry_availability(deployment_type, registries):
  141. def execute_module(module_name=None, module_args=None, tmp=None, task_vars=None):
  142. return {
  143. 'changed': False,
  144. }
  145. actual = DockerImageAvailability(execute_module=execute_module).run(tmp=None, task_vars=dict(
  146. openshift=dict(
  147. common=dict(
  148. service_type='origin',
  149. is_containerized=False,
  150. is_atomic=False,
  151. ),
  152. docker=dict(additional_registries=registries),
  153. ),
  154. openshift_deployment_type=deployment_type,
  155. openshift_release='',
  156. openshift_image_tag='',
  157. ))
  158. assert not actual.get("failed", False)