test_oc_csr_approve.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. import os
  2. import sys
  3. import pytest
  4. from ansible.module_utils.basic import AnsibleModule
  5. try:
  6. # python3, mock is built in.
  7. from unittest.mock import patch
  8. except ImportError:
  9. # In python2, mock is installed via pip.
  10. from mock import patch
  11. MODULE_PATH = os.path.realpath(os.path.join(__file__, os.pardir, os.pardir, 'library'))
  12. sys.path.insert(1, MODULE_PATH)
  13. import oc_csr_approve # noqa
  14. from oc_csr_approve import CSRapprove # noqa
  15. # base path for text files with sample outputs.
  16. ASSET_PATH = os.path.realpath(os.path.join(__file__, os.pardir, 'test_data'))
  17. RUN_CMD_MOCK = 'ansible.module_utils.basic.AnsibleModule.run_command'
  18. class DummyModule(AnsibleModule):
  19. def _load_params(self):
  20. self.params = {}
  21. def exit_json(*args, **kwargs):
  22. return 0
  23. def fail_json(*args, **kwargs):
  24. raise Exception(kwargs['msg'])
  25. def test_parse_subject_cn():
  26. subject = 'subject=/C=US/CN=fedora1.openshift.io/L=Raleigh/O=Red Hat/ST=North Carolina/OU=OpenShift\n'
  27. assert oc_csr_approve.parse_subject_cn(subject) == 'fedora1.openshift.io'
  28. subject = 'subject=C = US, CN = test.io, L = City, O = Company, ST = State, OU = Dept\n'
  29. assert oc_csr_approve.parse_subject_cn(subject) == 'test.io'
  30. def test_get_nodes():
  31. output_file = os.path.join(ASSET_PATH, 'oc_get_nodes.json')
  32. with open(output_file) as stdoutfile:
  33. oc_get_nodes_stdout = stdoutfile.read()
  34. module = DummyModule({})
  35. approver = CSRapprove(module, 'oc', '/dev/null', [])
  36. with patch(RUN_CMD_MOCK) as call_mock:
  37. call_mock.return_value = (0, oc_get_nodes_stdout, '')
  38. all_nodes = approver.get_nodes()
  39. assert all_nodes == ['fedora1.openshift.io', 'fedora2.openshift.io', 'fedora3.openshift.io']
  40. def test_get_csrs():
  41. module = DummyModule({})
  42. approver = CSRapprove(module, 'oc', '/dev/null', [])
  43. output_file = os.path.join(ASSET_PATH, 'oc_csr_approve_pending.json')
  44. with open(output_file) as stdoutfile:
  45. oc_get_csr_out = stdoutfile.read()
  46. # mock oc get csr call to cluster
  47. with patch(RUN_CMD_MOCK) as call_mock:
  48. call_mock.return_value = (0, oc_get_csr_out, '')
  49. csrs = approver.get_csrs()
  50. assert csrs[0]['kind'] == "CertificateSigningRequest"
  51. output_file = os.path.join(ASSET_PATH, 'openssl1.txt')
  52. with open(output_file) as stdoutfile:
  53. openssl_out = stdoutfile.read()
  54. # mock openssl req call.
  55. node_list = ['fedora2.mguginolocal.com']
  56. approver = CSRapprove(module, 'oc', '/dev/null', node_list)
  57. with patch(RUN_CMD_MOCK) as call_mock:
  58. call_mock.return_value = (0, openssl_out, '')
  59. csr_dict = approver.process_csrs(csrs, "client")
  60. # actually run openssl req call.
  61. csr_dict = approver.process_csrs(csrs, "client")
  62. assert csr_dict['node-csr-TkefytQp8Dz4Xp7uzcw605MocvI0gWuEOGNrHhOjGNQ'] == 'fedora2.mguginolocal.com'
  63. def test_confirm_needed_requests_present():
  64. module = DummyModule({})
  65. csr_dict = {'some-csr': 'fedora1.openshift.io'}
  66. not_found_nodes = ['host1']
  67. approver = CSRapprove(module, 'oc', '/dev/null', [])
  68. with pytest.raises(Exception) as err:
  69. approver.confirm_needed_requests_present(not_found_nodes, csr_dict)
  70. assert 'Exception: Could not find csr for nodes: host1' in str(err)
  71. not_found_nodes = ['fedora1.openshift.io']
  72. # this should complete silently
  73. approver.confirm_needed_requests_present(not_found_nodes, csr_dict)
  74. def test_approve_csrs():
  75. module = DummyModule({})
  76. csr_dict = {'csr-1': 'example.openshift.io'}
  77. approver = CSRapprove(module, 'oc', '/dev/null', [])
  78. with patch(RUN_CMD_MOCK) as call_mock:
  79. call_mock.return_value = (0, 'csr-1 ok', '')
  80. approver.approve_csrs(csr_dict, 'client')
  81. assert approver.result['client_approve_results'] == ['csr-1 ok']
  82. def test_get_ready_nodes_server():
  83. module = DummyModule({})
  84. nodes_list = ['fedora1.openshift.io']
  85. approver = CSRapprove(module, 'oc', '/dev/null', nodes_list)
  86. with patch(RUN_CMD_MOCK) as call_mock:
  87. call_mock.return_value = (0, 'ok', '')
  88. ready_nodes_server = approver.get_ready_nodes_server(nodes_list)
  89. assert ready_nodes_server == ['fedora1.openshift.io']
  90. def test_get_csrs_server():
  91. module = DummyModule({})
  92. output_file = os.path.join(ASSET_PATH, 'oc_csr_server_multiple_pends_one_host.json')
  93. with open(output_file) as stdoutfile:
  94. oc_get_csr_out = stdoutfile.read()
  95. approver = CSRapprove(module, 'oc', '/dev/null', [])
  96. # mock oc get csr call to cluster
  97. with patch(RUN_CMD_MOCK) as call_mock:
  98. call_mock.return_value = (0, oc_get_csr_out, '')
  99. csrs = approver.get_csrs()
  100. assert csrs[0]['kind'] == "CertificateSigningRequest"
  101. output_file = os.path.join(ASSET_PATH, 'openssl1.txt')
  102. with open(output_file) as stdoutfile:
  103. openssl_out = stdoutfile.read()
  104. node_list = ['fedora1.openshift.io']
  105. approver = CSRapprove(module, 'oc', '/dev/null', node_list)
  106. # mock openssl req call.
  107. with patch(RUN_CMD_MOCK) as call_mock:
  108. call_mock.return_value = (0, openssl_out, '')
  109. csr_dict = approver.process_csrs(csrs, "server")
  110. # actually run openssl req call.
  111. node_list = ['fedora2.mguginolocal.com']
  112. approver = CSRapprove(module, 'oc', '/dev/null', node_list)
  113. csr_dict = approver.process_csrs(csrs, "server")
  114. assert csr_dict['csr-2cxkp'] == 'fedora2.mguginolocal.com'
  115. def test_verify_server_csrs():
  116. module = DummyModule({})
  117. ready_nodes_server = ['fedora1.openshift.io']
  118. node_list = ['fedora1.openshift.io']
  119. approver = CSRapprove(module, 'oc', '/dev/null', node_list)
  120. with patch('oc_csr_approve.CSRapprove.get_ready_nodes_server') as call_mock:
  121. call_mock.return_value = ready_nodes_server
  122. # This should silently return
  123. approver.verify_server_csrs()
  124. node_list = ['fedora1.openshift.io', 'fedora2.openshift.io']
  125. approver = CSRapprove(module, 'oc', '/dev/null', node_list)
  126. with patch('oc_csr_approve.CSRapprove.get_ready_nodes_server') as call_mock:
  127. call_mock.return_value = ready_nodes_server
  128. with pytest.raises(Exception) as err:
  129. approver.verify_server_csrs()
  130. assert 'after approving server certs: fedora2.openshift.io' in str(err)
  131. if __name__ == '__main__':
  132. test_parse_subject_cn()
  133. test_get_nodes()
  134. test_get_csrs()
  135. test_confirm_needed_requests_present()
  136. test_approve_csrs()
  137. test_get_ready_nodes_server()
  138. test_get_csrs_server()
  139. test_verify_server_csrs()