rpm_version.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. #!/usr/bin/python
  2. """
  3. Ansible module for rpm-based systems determining existing package version information in a host.
  4. """
  5. from ansible.module_utils.basic import AnsibleModule
  6. from ansible.module_utils.six import string_types
  7. IMPORT_EXCEPTION = None
  8. try:
  9. import rpm # pylint: disable=import-error
  10. except ImportError as err:
  11. IMPORT_EXCEPTION = err # in tox test env, rpm import fails
  12. class RpmVersionException(Exception):
  13. """Base exception class for package version problems"""
  14. def __init__(self, message, problem_pkgs=None):
  15. Exception.__init__(self, message)
  16. self.problem_pkgs = problem_pkgs
  17. def main():
  18. """Entrypoint for this Ansible module"""
  19. module = AnsibleModule(
  20. argument_spec=dict(
  21. package_list=dict(type="list", required=True),
  22. ),
  23. supports_check_mode=True
  24. )
  25. if IMPORT_EXCEPTION:
  26. module.fail_json(msg="rpm_version module could not import rpm: %s" % IMPORT_EXCEPTION)
  27. # determine the packages we will look for
  28. pkg_list = module.params['package_list']
  29. if not pkg_list:
  30. module.fail_json(msg="package_list must not be empty")
  31. # get list of packages available and complain if any
  32. # of them are missing or if any errors occur
  33. try:
  34. pkg_versions = _retrieve_expected_pkg_versions(_to_dict(pkg_list))
  35. _check_pkg_versions(pkg_versions, _to_dict(pkg_list))
  36. except RpmVersionException as excinfo:
  37. module.fail_json(msg=str(excinfo))
  38. module.exit_json(changed=False)
  39. def _to_dict(pkg_list):
  40. return {pkg["name"]: pkg for pkg in pkg_list}
  41. def _retrieve_expected_pkg_versions(expected_pkgs_dict):
  42. """Search for installed packages matching given pkg names
  43. and versions. Returns a dictionary: {pkg_name: [versions]}"""
  44. transaction = rpm.TransactionSet()
  45. pkgs = {}
  46. for pkg_name in expected_pkgs_dict:
  47. matched_pkgs = transaction.dbMatch("name", pkg_name)
  48. if not matched_pkgs:
  49. continue
  50. for header in matched_pkgs:
  51. if header['name'] == pkg_name:
  52. if pkg_name not in pkgs:
  53. pkgs[pkg_name] = []
  54. pkgs[pkg_name].append(header['version'])
  55. return pkgs
  56. def _check_pkg_versions(found_pkgs_dict, expected_pkgs_dict):
  57. invalid_pkg_versions = {}
  58. not_found_pkgs = []
  59. for pkg_name, pkg in expected_pkgs_dict.items():
  60. if not found_pkgs_dict.get(pkg_name):
  61. not_found_pkgs.append(pkg_name)
  62. continue
  63. found_versions = [_parse_version(version) for version in found_pkgs_dict[pkg_name]]
  64. if isinstance(pkg["version"], string_types):
  65. expected_versions = [_parse_version(pkg["version"])]
  66. else:
  67. expected_versions = [_parse_version(version) for version in pkg["version"]]
  68. if not set(expected_versions) & set(found_versions):
  69. invalid_pkg_versions[pkg_name] = {
  70. "found_versions": found_versions,
  71. "required_versions": expected_versions,
  72. }
  73. if not_found_pkgs:
  74. raise RpmVersionException(
  75. '\n'.join([
  76. "The following packages were not found to be installed: {}".format('\n '.join([
  77. "{}".format(pkg)
  78. for pkg in not_found_pkgs
  79. ]))
  80. ]),
  81. not_found_pkgs,
  82. )
  83. if invalid_pkg_versions:
  84. raise RpmVersionException(
  85. '\n '.join([
  86. "The following packages were found to be installed with an incorrect version: {}".format('\n'.join([
  87. " \n{}\n Required version: {}\n Found versions: {}".format(
  88. pkg_name,
  89. ', '.join(pkg["required_versions"]),
  90. ', '.join([version for version in pkg["found_versions"]]))
  91. for pkg_name, pkg in invalid_pkg_versions.items()
  92. ]))
  93. ]),
  94. invalid_pkg_versions,
  95. )
  96. def _parse_version(version_str):
  97. segs = version_str.split('.')
  98. if not segs or len(segs) <= 2:
  99. return version_str
  100. return '.'.join(segs[0:2])
  101. if __name__ == '__main__':
  102. main()