conftest.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. # pylint: disable=missing-docstring,invalid-name,redefined-outer-name
  2. import os
  3. import pytest
  4. import sys
  5. from OpenSSL import crypto
  6. sys.path.insert(1, os.path.join(os.path.dirname(__file__), os.pardir, "lookup_plugins"))
  7. from openshift_master_facts_default_predicates import LookupModule as PredicatesLookupModule # noqa: E402
  8. from openshift_master_facts_default_priorities import LookupModule as PrioritiesLookupModule # noqa: E402
  9. # Parameter list for valid_cert fixture
  10. VALID_CERTIFICATE_PARAMS = [
  11. {
  12. 'short_name': 'client',
  13. 'cn': 'client.example.com',
  14. 'serial': 4,
  15. 'uses': b'clientAuth',
  16. 'dns': [],
  17. 'ip': [],
  18. },
  19. {
  20. 'short_name': 'server',
  21. 'cn': 'server.example.com',
  22. 'serial': 5,
  23. 'uses': b'serverAuth',
  24. 'dns': ['kubernetes', 'openshift'],
  25. 'ip': ['10.0.0.1', '192.168.0.1']
  26. },
  27. {
  28. 'short_name': 'combined',
  29. 'cn': 'combined.example.com',
  30. # Verify that HUGE serials parse correctly.
  31. # Frobs PARSING_HEX_SERIAL in _parse_cert
  32. # See https://bugzilla.redhat.com/show_bug.cgi?id=1464240
  33. 'serial': 14449739080294792594019643629255165375,
  34. 'uses': b'clientAuth, serverAuth',
  35. 'dns': ['etcd'],
  36. 'ip': ['10.0.0.2', '192.168.0.2']
  37. }
  38. ]
  39. # Extract the short_name from VALID_CERTIFICATE_PARAMS to provide
  40. # friendly naming for the valid_cert fixture
  41. VALID_CERTIFICATE_IDS = [param['short_name'] for param in VALID_CERTIFICATE_PARAMS]
  42. @pytest.fixture(scope='session')
  43. def ca(tmpdir_factory):
  44. ca_dir = tmpdir_factory.mktemp('ca')
  45. key = crypto.PKey()
  46. key.generate_key(crypto.TYPE_RSA, 2048)
  47. cert = crypto.X509()
  48. cert.set_version(3)
  49. cert.set_serial_number(1)
  50. cert.get_subject().commonName = 'test-signer'
  51. cert.gmtime_adj_notBefore(0)
  52. cert.gmtime_adj_notAfter(24 * 60 * 60)
  53. cert.set_issuer(cert.get_subject())
  54. cert.set_pubkey(key)
  55. cert.add_extensions([
  56. crypto.X509Extension(b'basicConstraints', True, b'CA:TRUE, pathlen:0'),
  57. crypto.X509Extension(b'keyUsage', True,
  58. b'digitalSignature, keyEncipherment, keyCertSign, cRLSign'),
  59. crypto.X509Extension(b'subjectKeyIdentifier', False, b'hash', subject=cert)
  60. ])
  61. cert.add_extensions([
  62. crypto.X509Extension(b'authorityKeyIdentifier', False, b'keyid:always', issuer=cert)
  63. ])
  64. cert.sign(key, 'sha256')
  65. return {
  66. 'dir': ca_dir,
  67. 'key': key,
  68. 'cert': cert,
  69. }
  70. @pytest.fixture(scope='session',
  71. ids=VALID_CERTIFICATE_IDS,
  72. params=VALID_CERTIFICATE_PARAMS)
  73. def valid_cert(request, ca):
  74. common_name = request.param['cn']
  75. key = crypto.PKey()
  76. key.generate_key(crypto.TYPE_RSA, 2048)
  77. cert = crypto.X509()
  78. cert.set_serial_number(request.param['serial'])
  79. cert.gmtime_adj_notBefore(0)
  80. cert.gmtime_adj_notAfter(24 * 60 * 60)
  81. cert.set_issuer(ca['cert'].get_subject())
  82. cert.set_pubkey(key)
  83. cert.set_version(3)
  84. cert.get_subject().commonName = common_name
  85. cert.add_extensions([
  86. crypto.X509Extension(b'basicConstraints', True, b'CA:FALSE'),
  87. crypto.X509Extension(b'keyUsage', True, b'digitalSignature, keyEncipherment'),
  88. crypto.X509Extension(b'extendedKeyUsage', False, request.param['uses']),
  89. ])
  90. if request.param['dns'] or request.param['ip']:
  91. san_list = ['DNS:{}'.format(common_name)]
  92. san_list.extend(['DNS:{}'.format(x) for x in request.param['dns']])
  93. san_list.extend(['IP:{}'.format(x) for x in request.param['ip']])
  94. cert.add_extensions([
  95. crypto.X509Extension(b'subjectAltName', False, ', '.join(san_list).encode('utf8'))
  96. ])
  97. cert.sign(ca['key'], 'sha256')
  98. cert_contents = crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
  99. cert_file = ca['dir'].join('{}.crt'.format(common_name))
  100. cert_file.write_binary(cert_contents)
  101. return {
  102. 'common_name': common_name,
  103. 'serial': request.param['serial'],
  104. 'dns': request.param['dns'],
  105. 'ip': request.param['ip'],
  106. 'uses': request.param['uses'],
  107. 'cert_file': cert_file,
  108. 'cert': cert
  109. }
  110. @pytest.fixture()
  111. def predicates_lookup():
  112. return PredicatesLookupModule()
  113. @pytest.fixture()
  114. def priorities_lookup():
  115. return PrioritiesLookupModule()
  116. @pytest.fixture()
  117. def facts():
  118. return {
  119. 'openshift': {
  120. 'common': {}
  121. }
  122. }
  123. @pytest.fixture(params=[True, False])
  124. def regions_enabled(request):
  125. return request.param
  126. @pytest.fixture(params=[True, False])
  127. def zones_enabled(request):
  128. return request.param
  129. def v_prefix(release):
  130. """Prefix a release number with 'v'."""
  131. return "v" + release
  132. def minor(release):
  133. """Add a suffix to release, making 'X.Y' become 'X.Y.Z'."""
  134. return release + ".1"
  135. @pytest.fixture(params=[str, v_prefix, minor])
  136. def release_mod(request):
  137. """Modifies a release string to alternative valid values."""
  138. return request.param