123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297 |
- # coding: utf-8
- # Copyright (C) 1994-2018 Altair Engineering, Inc.
- # For more information, contact Altair at www.altair.com.
- #
- # This file is part of the PBS Professional ("PBS Pro") software.
- #
- # Open Source License Information:
- #
- # PBS Pro is free software. You can redistribute it and/or modify it under the
- # terms of the GNU Affero General Public License as published by the Free
- # Software Foundation, either version 3 of the License, or (at your option) any
- # later version.
- #
- # PBS Pro is distributed in the hope that it will be useful, but WITHOUT ANY
- # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- # FOR A PARTICULAR PURPOSE.
- # See the GNU Affero General Public License for more details.
- #
- # You should have received a copy of the GNU Affero General Public License
- # along with this program. If not, see <http://www.gnu.org/licenses/>.
- #
- # Commercial License Information:
- #
- # For a copy of the commercial license terms and conditions,
- # go to: (http://www.pbspro.com/UserArea/agreement.html)
- # or contact the Altair Legal Department.
- #
- # Altair’s dual-license business model allows companies, individuals, and
- # organizations to create proprietary derivative works of PBS Pro and
- # distribute them - whether embedded or bundled with other software -
- # under a commercial license agreement.
- #
- # Use of Altair’s trademarks, including but not limited to "PBS™",
- # "PBS Professional®", and "PBS Pro™" and Altair’s logos is subject to Altair's
- # trademark licensing policies.
- import os
- import sys
- import logging
- from nose.plugins.base import Plugin
- from ptl.utils.pbs_testsuite import PBSTestSuite
- log = logging.getLogger('nose.plugins.PTLTestLoader')
- class PTLTestLoader(Plugin):
- """
- Load test cases from given parameter
- """
- name = 'PTLTestLoader'
- score = sys.maxint - 2
- logger = logging.getLogger(__name__)
- def __init__(self):
- Plugin.__init__(self)
- self.suites_list = []
- self.excludes = []
- self.follow = False
- self._only_ts = '__only__ts__'
- self._only_tc = '__only__tc__'
- self._test_marker = 'test_'
- self._tests_list = {self._only_ts: [], self._only_tc: []}
- self._excludes_list = {self._only_ts: [], self._only_tc: []}
- self.__tests_list_copy = {self._only_ts: [], self._only_tc: []}
- self.__allowed_cls = []
- self.__allowed_method = []
- def options(self, parser, env):
- """
- Register command line options
- """
- pass
- def set_data(self, testgroup, suites, excludes, follow):
- """
- Set the data required for loading test data
- :param testgroup: Test group
- :param suites: Test suites to load
- :param excludes: Tests to exclude while running
- """
- if os.access(str(testgroup), os.R_OK):
- f = open(testgroup, 'r')
- self.suites_list.extend(f.readline().strip().split(','))
- f.close()
- elif suites is not None:
- self.suites_list.extend(suites.split(','))
- if excludes is not None:
- self.excludes.extend(excludes.split(','))
- self.follow = follow
- def configure(self, options, config):
- """
- Configure the ``plugin`` and ``system``, based on selected options
- """
- tl = self._tests_list
- tlc = self.__tests_list_copy
- for _is in self.suites_list:
- if '.' in _is:
- suite, case = _is.split('.')
- if case in tl[self._only_tc]:
- tl[self._only_tc].remove(case)
- tlc[self._only_tc].remove(case)
- if suite in tl.keys():
- if case not in tl[suite]:
- tl[suite].append(case)
- tlc[suite].append(case)
- else:
- tl.setdefault(suite, [case])
- tlc.setdefault(suite, [case])
- elif _is.startswith(self._test_marker):
- if _is not in tl[self._only_tc]:
- tl[self._only_tc].append(_is)
- tlc[self._only_tc].append(_is)
- else:
- if _is not in tl[self._only_ts]:
- tl[self._only_ts].append(_is)
- tlc[self._only_ts].append(_is)
- for k, v in tl.items():
- if k in (self._only_ts, self._only_tc):
- continue
- if len(v) == 0:
- tl[self._only_ts].append(k)
- tlc[self._only_ts].append(k)
- for name in tl[self._only_ts]:
- if name in tl.keys():
- del tl[name]
- del tlc[name]
- extl = self._excludes_list
- for _is in self.excludes:
- if '.' in _is:
- suite, case = _is.split('.')
- if case in extl[self._only_tc]:
- extl[self._only_tc].remove(case)
- if suite in extl.keys():
- if case not in extl[suite]:
- extl[suite].append(case)
- else:
- extl.setdefault(suite, [case])
- elif _is.startswith(self._test_marker):
- if _is not in extl[self._only_tc]:
- extl[self._only_tc].append(_is)
- else:
- if _is not in extl[self._only_ts]:
- extl[self._only_ts].append(_is)
- for k, v in extl.items():
- if k in (self._only_ts, self._only_tc):
- continue
- if len(v) == 0:
- extl[self._only_ts].append(k)
- for name in extl[self._only_ts]:
- if name in extl.keys():
- del extl[name]
- log.debug('included_tests:%s' % (str(self._tests_list)))
- log.debug('included_tests(copy):%s' % (str(self.__tests_list_copy)))
- log.debug('excluded_tests:%s' % (str(self._excludes_list)))
- self.enabled = len(self.suites_list) > 0
- del self.suites_list
- del self.excludes
- def check_unknown(self):
- """
- Check for unknown test suite and test case
- """
- log.debug('check_unknown called')
- only_ts = self.__tests_list_copy.pop(self._only_ts)
- only_tc = self.__tests_list_copy.pop(self._only_tc)
- msg = []
- if len(self.__tests_list_copy) > 0:
- for k, v in self.__tests_list_copy.items():
- msg.extend(map(lambda x: k + '.' + x, v))
- if len(only_tc) > 0:
- msg.extend(only_tc)
- if len(msg) > 0:
- _msg = ['unknown testcase(s): %s' % (','.join(msg))]
- msg = _msg
- if len(only_ts) > 0:
- msg += ['unknown testsuite(s): %s' % (','.join(only_ts))]
- if len(msg) > 0:
- for l in msg:
- logging.error(l)
- sys.exit(1)
- def prepareTestLoader(self, loader):
- """
- Prepare test loader
- """
- old_loadTestsFromNames = loader.loadTestsFromNames
- def check_loadTestsFromNames(names, module=None):
- rv = old_loadTestsFromNames(names, module)
- self.check_unknown()
- return rv
- loader.loadTestsFromNames = check_loadTestsFromNames
- return loader
- def check_follow(self, cls, method=None):
- cname = cls.__name__
- if not issubclass(cls, PBSTestSuite):
- return False
- if cname == 'PBSTestSuite':
- if 'PBSTestSuite' not in self._tests_list[self._only_ts]:
- return False
- if cname in self._excludes_list[self._only_ts]:
- return False
- if cname in self._tests_list[self._only_ts]:
- if cname in self.__tests_list_copy[self._only_ts]:
- self.__tests_list_copy[self._only_ts].remove(cname)
- return True
- if ((cname in self._tests_list.keys()) and (method is None)):
- return True
- if method is not None:
- mname = method.__name__
- if not mname.startswith(self._test_marker):
- return False
- if mname in self._excludes_list[self._only_tc]:
- return False
- if ((cname in self._excludes_list.keys()) and
- (mname in self._excludes_list[cname])):
- return False
- if ((cname in self._tests_list.keys()) and
- (mname in self._tests_list[cname])):
- if cname in self.__tests_list_copy.keys():
- if mname in self.__tests_list_copy[cname]:
- self.__tests_list_copy[cname].remove(mname)
- if len(self.__tests_list_copy[cname]) == 0:
- del self.__tests_list_copy[cname]
- return True
- if mname in self._tests_list[self._only_tc]:
- if mname in self.__tests_list_copy[self._only_tc]:
- self.__tests_list_copy[self._only_tc].remove(mname)
- return True
- if self.follow:
- return self.check_follow(cls.__bases__[0], method)
- else:
- return False
- def is_already_allowed(self, cls, method=None):
- """
- :param method: Method to check
- :returns: True if method is already allowed else False
- """
- name = cls.__name__
- if method is not None:
- name += '.' + method.__name__
- if name in self.__allowed_method:
- return True
- else:
- self.__allowed_method.append(name)
- return False
- else:
- if name in self.__allowed_cls:
- return True
- else:
- self.__allowed_cls.append(name)
- return False
- def wantClass(self, cls):
- """
- Is the class wanted?
- """
- has_test = False
- for t in dir(cls):
- if t.startswith(self._test_marker):
- has_test = True
- break
- if not has_test:
- return False
- rv = self.check_follow(cls)
- if rv and not self.is_already_allowed(cls):
- log.debug('wantClass:%s' % (str(cls)))
- else:
- return False
- def wantFunction(self, function):
- """
- Is the function wanted?
- """
- return self.wantMethod(function)
- def wantMethod(self, method):
- """
- Is the method wanted?
- """
- try:
- cls = method.im_class
- except AttributeError:
- return False
- if not method.__name__.startswith(self._test_marker):
- return False
- rv = self.check_follow(cls, method)
- if rv and not self.is_already_allowed(cls, method):
- log.debug('wantMethod:%s' % (str(method)))
- else:
- return False
|