Source code for cfme.utils.blockers

# -*- coding: utf-8 -*-
import re
import six
import six.moves.xmlrpc_client
from github import Github
from six.moves.urllib.parse import urlparse

from cfme.fixtures.pytest_store import store
from cfme.utils import classproperty, conf, version
from cfme.utils.bz import Bugzilla
from cfme.utils.log import logger


[docs]class Blocker(object): """Base class for all blockers REQUIRED THING! Any subclass' constructors must accept kwargs and after POPping the values required for the blocker's operation, `call to ``super`` with ``**kwargs`` must be done! Failing to do this will render some of the functionality disabled ;). """ blocks = False kwargs = {} def __init__(self, **kwargs): self.forced_streams = kwargs.pop("forced_streams", []) self.__dict__["kwargs"] = kwargs @property def url(self): raise NotImplementedError('You need to implement .url') @classmethod
[docs] def all_blocker_engines(cls): """Return mapping of name:class of all the blocker engines in this module. Having this as a separate function will later enable to scatter the engines across modules in case of extraction into a separate library. """ return { 'GH': GH, 'BZ': BZ, 'JIRA': JIRA, }
@classmethod
[docs] def parse(cls, blocker, **kwargs): """Create a blocker object from some representation""" if isinstance(blocker, cls): return blocker elif isinstance(blocker, six.string_types): if "#" in blocker: # Generic blocker engine, spec = blocker.split("#", 1) try: engine_class = cls.all_blocker_engines()[engine] except KeyError: raise ValueError( "{} is a wrong engine specification for blocker! ({} available)".format( engine, ", ".join(cls.all_blocker_engines().keys()))) return engine_class(spec, **kwargs) match = re.match('^[A-Z][A-Z0-9]+-[0-9]+$', blocker) if match is not None: # React to the typical JIRA format of FOO-42 return JIRA(blocker) # EXTEND: If someone has other ideas, put them here raise ValueError("Could not parse blocker {}".format(blocker)) else: raise ValueError("Wrong specification of the blockers!")
[docs]class GH(Blocker): DEFAULT_REPOSITORY = conf.env.get("github", {}).get("default_repo") _issue_cache = {} @classproperty def github(cls): if not hasattr(cls, "_github"): token = conf.env.get("github", {}).get("token") if token is not None: cls._github = Github(token) else: cls._github = Github() # Without auth max 60 req/hr return cls._github def __init__(self, description, **kwargs): super(GH, self).__init__(**kwargs) self._repo = None self.issue = None self.upstream_only = kwargs.get('upstream_only', False) self.since = kwargs.get('since') self.until = kwargs.get('until') if isinstance(description, (list, tuple)): try: self.repo, self.issue = description self.issue = int(self.issue) except ValueError: raise ValueError( "The GH issue specification must have 2 items and issue must be number") elif isinstance(description, int): if self.DEFAULT_REPOSITORY is None: raise ValueError("You must specify github/default_repo in env.yaml!") self.issue = description elif isinstance(description, basestring): try: owner, repo, issue_num = re.match(r"^([^/]+)/([^/:]+):([0-9]+)$", str(description).strip()).groups() except AttributeError: raise ValueError( "Could not parse '{}' as a proper GH issue anchor!".format(str(description))) else: self._repo = "{}/{}".format(owner, repo) self.issue = int(issue_num) else: raise ValueError("GH issue specified wrong") @property def data(self): identifier = "{}:{}".format(self.repo, self.issue) if identifier not in self._issue_cache: self._issue_cache[identifier] = self.github.get_repo(self.repo).get_issue(self.issue) return self._issue_cache[identifier] @property def blocks(self): if self.upstream_only and version.appliance_is_downstream(): return False if self.data.state == "closed": return False # Now let's check versions if self.since is None and self.until is None: # No version specifics return True elif self.since is not None and self.until is not None: # since inclusive, until exclusive return self.since <= version.current_version() < self.until elif self.since is not None: # Only since return version.current_version() >= self.since elif self.until is not None: # Only until return version.current_version() < self.until # All branches covered @property def repo(self): return self._repo or self.DEFAULT_REPOSITORY def __str__(self): return "GitHub Issue https://github.com/{}/issues/{}".format(self.repo, self.issue) @property def url(self): return "https://github.com/{}/issues/{}".format(self.repo, self.issue)
[docs]class BZ(Blocker): @classproperty def bugzilla(cls): if not hasattr(cls, "_bugzilla"): try: cls._bugzilla = Bugzilla.from_config() except KeyError: return None return cls._bugzilla def __init__(self, bug_id, **kwargs): self.ignore_bugs = kwargs.pop("ignore_bugs", []) super(BZ, self).__init__(**kwargs) self.bug_id = int(bug_id) @property def data(self): return self.bugzilla.resolve_blocker( self.bug_id, ignore_bugs=self.ignore_bugs, force_block_streams=self.forced_streams) @property def bugzilla_bug(self): if self.data is None: return None return self.data @property def blocks(self): try: bug = self.data if bug is None: return False result = False if bug.is_opened: result = True if bug.upstream_bug: if not version.appliance_is_downstream() and bug.can_test_on_upstream: result = False if not result and version.appliance_is_downstream(): if bug.fixed_in is not None: return version.current_version() < bug.fixed_in return result except six.moves.xmlrpc_client.Fault as e: code = e.faultCode s = e.faultString.strip().split("\n")[0] logger.error("Bugzilla thrown a fault: %s/%s", code, s) logger.warning("Ignoring and taking the bug as non-blocking") store.terminalreporter.write( "Bugzila made a booboo: {}/{}\n".format(code, s), bold=True) return False
[docs] def get_bug_url(self): bz_url = urlparse(self.bugzilla.bugzilla.url) return "{}://{}/show_bug.cgi?id={}".format(bz_url.scheme, bz_url.netloc, self.bug_id)
@property def url(self): return self.get_bug_url() def __str__(self): return "Bugzilla bug {} (or one of its copies)".format(self.get_bug_url())
[docs]class JIRA(Blocker): @classproperty def jira(cls): # noqa if not hasattr(cls, "_jira"): try: from jira import JIRA as JiraClient # noqa cls._jira = JiraClient(conf.env.jira_url, options={'verify': False}) except KeyError: return None return cls._jira def __init__(self, jira_id, **kwargs): super(JIRA, self).__init__(**kwargs) self.jira_id = jira_id @property def url(self): try: jira_url = conf.env.jira_url except KeyError: return None return '{}/browse/{}'.format(jira_url.rstrip('/'), self.jira_id) @property def blocks(self): jira = self.jira if jira is None: # JIRA unspecified, shut up and don't block return False issue = jira.issue(self.jira_id, fields='status') return issue.fields.status.name.lower() != 'done' def __str__(self): return 'Jira card {}'.format(self.url)