rpm_version.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. #!/usr/bin/python
  2. """
  3. Ansible module for rpm-based systems determining existing package version information in a host.
  4. """
  5. from ansible.module_utils.basic import AnsibleModule
  6. IMPORT_EXCEPTION = None
  7. try:
  8. import rpm # pylint: disable=import-error
  9. except ImportError as err:
  10. IMPORT_EXCEPTION = err # in tox test env, rpm import fails
  11. class RpmVersionException(Exception):
  12. """Base exception class for package version problems"""
  13. def __init__(self, message, problem_pkgs=None):
  14. Exception.__init__(self, message)
  15. self.problem_pkgs = problem_pkgs
  16. def main():
  17. """Entrypoint for this Ansible module"""
  18. module = AnsibleModule(
  19. argument_spec=dict(
  20. package_list=dict(type="list", required=True),
  21. ),
  22. supports_check_mode=True
  23. )
  24. if IMPORT_EXCEPTION:
  25. module.fail_json(msg="rpm_version module could not import rpm: %s" % IMPORT_EXCEPTION)
  26. # determine the packages we will look for
  27. pkg_list = module.params['package_list']
  28. if not pkg_list:
  29. module.fail_json(msg="package_list must not be empty")
  30. # get list of packages available and complain if any
  31. # of them are missing or if any errors occur
  32. try:
  33. pkg_versions = _retrieve_expected_pkg_versions(_to_dict(pkg_list))
  34. _check_pkg_versions(pkg_versions, _to_dict(pkg_list))
  35. except RpmVersionException as excinfo:
  36. module.fail_json(msg=str(excinfo))
  37. module.exit_json(changed=False)
  38. def _to_dict(pkg_list):
  39. return {pkg["name"]: pkg for pkg in pkg_list}
  40. def _retrieve_expected_pkg_versions(expected_pkgs_dict):
  41. """Search for installed packages matching given pkg names
  42. and versions. Returns a dictionary: {pkg_name: [versions]}"""
  43. transaction = rpm.TransactionSet()
  44. pkgs = {}
  45. for pkg_name in expected_pkgs_dict:
  46. matched_pkgs = transaction.dbMatch("name", pkg_name)
  47. if not matched_pkgs:
  48. continue
  49. for header in matched_pkgs:
  50. if header['name'] == pkg_name:
  51. if pkg_name not in pkgs:
  52. pkgs[pkg_name] = []
  53. pkgs[pkg_name].append(header['version'])
  54. return pkgs
  55. def _check_pkg_versions(found_pkgs_dict, expected_pkgs_dict):
  56. invalid_pkg_versions = {}
  57. not_found_pkgs = []
  58. for pkg_name, pkg in expected_pkgs_dict.items():
  59. if not found_pkgs_dict.get(pkg_name):
  60. not_found_pkgs.append(pkg_name)
  61. continue
  62. found_versions = [_parse_version(version) for version in found_pkgs_dict[pkg_name]]
  63. expected_version = _parse_version(pkg["version"])
  64. if expected_version not in found_versions:
  65. invalid_pkg_versions[pkg_name] = {
  66. "found_versions": found_versions,
  67. "required_version": expected_version,
  68. }
  69. if not_found_pkgs:
  70. raise RpmVersionException(
  71. '\n'.join([
  72. "The following packages were not found to be installed: {}".format('\n '.join([
  73. "{}".format(pkg)
  74. for pkg in not_found_pkgs
  75. ]))
  76. ]),
  77. not_found_pkgs,
  78. )
  79. if invalid_pkg_versions:
  80. raise RpmVersionException(
  81. '\n '.join([
  82. "The following packages were found to be installed with an incorrect version: {}".format('\n'.join([
  83. " \n{}\n Required version: {}\n Found versions: {}".format(
  84. pkg_name,
  85. pkg["required_version"],
  86. ', '.join([version for version in pkg["found_versions"]]))
  87. for pkg_name, pkg in invalid_pkg_versions.items()
  88. ]))
  89. ]),
  90. invalid_pkg_versions,
  91. )
  92. def _parse_version(version_str):
  93. segs = version_str.split('.')
  94. if not segs or len(segs) <= 2:
  95. return version_str
  96. return '.'.join(segs[0:2])
  97. if __name__ == '__main__':
  98. main()