# -*- coding: utf-8 -*-
import re
from urllib.parse import urlparse
from xmlrpc.client import Fault as RPCFault
from github import Github
from cfme.fixtures.pytest_store import store
from cfme.utils import classproperty
from cfme.utils import conf
from cfme.utils import version
from cfme.utils.bz import Bugzilla
from cfme.utils.log import logger
[docs]class Blocker(object):
"""Base class for all blockers
REQUIRED THING! Any subclass' constructors must accept kwargs and after POPping the values
required for the blocker's operation, `call to ``super`` with ``**kwargs`` must be done!
Failing to do this will render some of the functionality disabled ;).
"""
blocks = False
kwargs = {}
def __init__(self, **kwargs):
self.forced_streams = kwargs.pop("forced_streams", [])
self.__dict__["kwargs"] = kwargs
@property
def url(self):
raise NotImplementedError('You need to implement .url')
[docs] @classmethod
def all_blocker_engines(cls):
"""Return mapping of name:class of all the blocker engines in this module.
Having this as a separate function will later enable to scatter the engines across modules
in case of extraction into a separate library.
"""
return {
'GH': GH,
'BZ': BZ,
'JIRA': JIRA,
}
[docs] @classmethod
def parse(cls, blocker, **kwargs):
"""Create a blocker object from some representation"""
if isinstance(blocker, cls):
return blocker
elif isinstance(blocker, str):
if "#" in blocker:
# Generic blocker
engine, spec = blocker.split("#", 1)
try:
engine_class = cls.all_blocker_engines()[engine]
except KeyError:
raise ValueError(
"{} is a wrong engine specification for blocker! ({} available)".format(
engine, ", ".join(list(cls.all_blocker_engines().keys()))))
return engine_class(spec, **kwargs)
match = re.match('^[A-Z][A-Z0-9]+-[0-9]+$', blocker)
if match is not None:
# React to the typical JIRA format of FOO-42
return JIRA(blocker)
# EXTEND: If someone has other ideas, put them here
raise ValueError("Could not parse blocker {}".format(blocker))
else:
raise ValueError("Wrong specification of the blockers!")
[docs]class GH(Blocker):
DEFAULT_REPOSITORY = conf.env.get("github", {}).get("default_repo")
_issue_cache = {}
@classproperty
def github(cls):
if not hasattr(cls, "_github"):
token = conf.env.get("github", {}).get("token")
if token is not None:
cls._github = Github(token)
else:
cls._github = Github() # Without auth max 60 req/hr
return cls._github
def __init__(self, description, **kwargs):
super(GH, self).__init__(**kwargs)
self._repo = None
self.issue = None
self.upstream_only = kwargs.get('upstream_only', False)
self.since = kwargs.get('since')
self.until = kwargs.get('until')
if isinstance(description, (list, tuple)):
try:
self.repo, self.issue = description
self.issue = int(self.issue)
except ValueError:
raise ValueError(
"The GH issue specification must have 2 items and issue must be number")
elif isinstance(description, int):
if self.DEFAULT_REPOSITORY is None:
raise ValueError("You must specify github/default_repo in env.yaml!")
self.issue = description
elif isinstance(description, str):
try:
owner, repo, issue_num = re.match(r"^([^/]+)/([^/:]+):([0-9]+)$",
str(description).strip()).groups()
except AttributeError:
raise ValueError(
"Could not parse '{}' as a proper GH issue anchor!".format(str(description)))
else:
self._repo = "{}/{}".format(owner, repo)
self.issue = int(issue_num)
else:
raise ValueError("GH issue specified wrong")
@property
def data(self):
identifier = "{}:{}".format(self.repo, self.issue)
if identifier not in self._issue_cache:
self._issue_cache[identifier] = self.github.get_repo(self.repo).get_issue(self.issue)
return self._issue_cache[identifier]
@property
def blocks(self):
if self.upstream_only and version.appliance_is_downstream():
return False
if self.data.state == "closed":
return False
# Now let's check versions
if self.since is None and self.until is None:
# No version specifics
return True
elif self.since is not None and self.until is not None:
# since inclusive, until exclusive
return self.since <= version.current_version() < self.until
elif self.since is not None:
# Only since
return version.current_version() >= self.since
elif self.until is not None:
# Only until
return version.current_version() < self.until
# All branches covered
@property
def repo(self):
return self._repo or self.DEFAULT_REPOSITORY
def __str__(self):
return "GitHub Issue https://github.com/{}/issues/{}".format(self.repo, self.issue)
@property
def url(self):
return "https://github.com/{}/issues/{}".format(self.repo, self.issue)
[docs]class BZ(Blocker):
@classproperty
def bugzilla(cls):
if not hasattr(cls, "_bugzilla"):
try:
cls._bugzilla = Bugzilla.from_config()
except KeyError:
return None
return cls._bugzilla
def __init__(self, bug_id, **kwargs):
self.ignore_bugs = kwargs.pop("ignore_bugs", [])
super(BZ, self).__init__(**kwargs)
self.bug_id = int(bug_id)
@property
def data(self):
return self.bugzilla.resolve_blocker(
self.bug_id, ignore_bugs=self.ignore_bugs, force_block_streams=self.forced_streams)
@property
def bugzilla_bug(self):
if self.data is None:
return None
return self.data
@property
def can_test_on_current_upstream_appliance(self):
"""
This property is designed for use with the blocks property and is answering the question:
"If I have a bug, and that bug is in a "POST" or "MODIFIED" state, and if I have an
upstream appliance, is the build date of my current upstream appliance after
the fix for the bug was merged?"
If so, then it will return True, because the bug's fix is included in your current
upstream appliance.
If you happen to be on an older version of an upstream appliance that doesn't have the fix,
then it will return False.
"""
bug = self.data
if bug is None: # if bug is None, then it is not a blocker and we can test it
return True
if not bug.can_test_on_upstream: # if bug is not fixed in upstream, we can't test it!
return False
change_states = {"POST", "MODIFIED"}
# With these states, the change is not in upstream
if bug.status in {"NEW", "ON_DEV", "ASSIGNED"}:
return False
history = bug.get_history_raw()["bugs"][0]["history"]
changes = []
# We look for status changes in the history
for event in history:
for change in event["changes"]:
if change["field_name"].lower() != "status":
continue
if change["added"] in change_states:
changes.append(event["when"])
return event["when"] < version.appliance_build_datetime()
else:
return False
@property
def blocks(self):
try:
bug = self.data
if bug is None:
return False
result = False
if bug.is_opened:
result = True
if bug.upstream_bug:
if (not version.appliance_is_downstream() and
self.can_test_on_current_upstream_appliance):
result = False
if not result and version.appliance_is_downstream():
if bug.fixed_in is not None:
return version.current_version() < bug.fixed_in
return result
except RPCFault as e:
code = e.faultCode
s = e.faultString.strip().split("\n")[0]
logger.error("Bugzilla thrown a fault: %s/%s", code, s)
logger.warning("Ignoring and taking the bug as non-blocking")
store.terminalreporter.write(
"Bugzila made a booboo: {}/{}\n".format(code, s), bold=True)
return False
[docs] def get_bug_url(self):
bz_url = urlparse(self.bugzilla.bugzilla.url)
return "{}://{}/show_bug.cgi?id={}".format(bz_url.scheme, bz_url.netloc, self.bug_id)
@property
def url(self):
return self.get_bug_url()
def __str__(self):
return "Bugzilla bug {} (or one of its copies)".format(self.get_bug_url())
[docs]class JIRA(Blocker):
@classproperty
def jira(cls): # noqa
if not hasattr(cls, "_jira"):
try:
from jira import JIRA as JiraClient # noqa
cls._jira = JiraClient(conf.env.jira_url, options={'verify': False})
except KeyError:
return None
return cls._jira
def __init__(self, jira_id, **kwargs):
super(JIRA, self).__init__(**kwargs)
self.jira_id = jira_id
@property
def url(self):
try:
jira_url = conf.env.jira_url
except KeyError:
return None
return '{}/browse/{}'.format(jira_url.rstrip('/'), self.jira_id)
@property
def blocks(self):
jira = self.jira
if jira is None:
# JIRA unspecified, shut up and don't block
return False
issue = jira.issue(self.jira_id, fields='status')
return issue.fields.status.name.lower() != 'done'
def __str__(self):
return 'Jira card {}'.format(self.url)