# -*- coding: utf-8 -*-
import re
from collections import Sequence
import six
from bugzilla import Bugzilla as _Bugzilla
from miq_version import Version, LATEST
from cached_property import cached_property
from cfme.utils.conf import credentials, env
from cfme.utils.log import logger
from cfme.utils.version import current_version, appliance_build_datetime, appliance_is_downstream
NONE_FIELDS = {"---", "undefined", "unspecified"}
[docs]class Product(object):
def __init__(self, data):
self._data = data
@property
def default_release(self):
return Version(self._data["default_release"])
@property
def name(self):
return self._data["name"]
@property
def milestones(self):
return [ms["name"] for ms in self._data["milestones"]]
@property
def releases(self):
return [release["name"] for release in self._data["releases"]]
@property
def versions(self):
return sorted(
Version(version["name"])
for version in self._data["versions"]
if version["name"] not in NONE_FIELDS
)
@property
def latest_version(self):
return self.versions[-1]
[docs]class Bugzilla(object):
def __init__(self, **kwargs):
# __kwargs passed to _Bugzilla instantiation, pop our args out
self.__product = kwargs.pop("product", None)
self.__config_options = kwargs.pop('config_options', {})
self.__kwargs = kwargs
self.__bug_cache = {}
self.__product_cache = {}
@property
def bug_count(self):
return len(self.__bug_cache)
@property
def bugs(self):
for bug in self.__bug_cache.values():
yield bug
[docs] def products(self, *names):
return list(map(Product, self.bugzilla._proxy.Product.get({"names": names})["products"]))
[docs] def product(self, product):
if product not in self.__product_cache:
self.__product_cache[product] = self.products(product)[0]
return self.__product_cache[product]
@property
def default_product(self):
if self.__product is None:
return None
return self.product(self.__product)
@classmethod
[docs] def from_config(cls):
bz_conf = env.get('bugzilla', {}) # default empty so we can call .get() later
url = bz_conf.get('url')
if url is None:
url = 'https://bugzilla.redhat.com/xmlrpc.cgi'
logger.warning("No Bugzilla URL specified in conf, using default: %s", url)
cred_key = bz_conf.get("bugzilla", {}).get("credentials")
return cls(url=url,
user=credentials.get(cred_key, {}).get("username"),
password=credentials.get(cred_key, {}).get("password"),
cookiefile=None,
tokenfile=None,
product=bz_conf.get("bugzilla", {}).get("product"),
config_options=bz_conf)
@cached_property
def bugzilla(self):
return _Bugzilla(**self.__kwargs)
@cached_property
def loose(self):
return self.__config_options.get("loose", [])
@cached_property
def open_states(self):
return set(self.__config_options.get("skip", []))
@cached_property
def upstream_version(self):
if self.default_product is not None:
return self.default_product.latest_version
else:
return Version(self.__config_options.get("upstream_version", Version.latest().vstring))
[docs] def get_bug(self, id):
id = int(id)
if id not in self.__bug_cache:
self.__bug_cache[id] = BugWrapper(self, self.bugzilla.getbug(id))
return self.__bug_cache[id]
[docs] def get_bug_variants(self, id):
if isinstance(id, BugWrapper):
bug = id
else:
bug = self.get_bug(id)
expanded = set([])
found = set([])
stack = set([bug])
while stack:
b = stack.pop()
if b.status == "CLOSED" and b.resolution == "DUPLICATE":
b = self.get_bug(b.dupe_of)
found.add(b)
if b.copy_of:
stack.add(self.get_bug(b.copy_of))
if b not in expanded:
for cp in map(self.get_bug, b.copies):
found.add(cp)
stack.add(cp)
expanded.add(b)
return found
[docs] def resolve_blocker(self, blocker, version=None, ignore_bugs=None, force_block_streams=None):
# ignore_bugs is mutable but is not mutated here! Same force_block_streams
force_block_streams = force_block_streams or []
ignore_bugs = set([]) if not ignore_bugs else ignore_bugs
if isinstance(id, BugWrapper):
bug = blocker
else:
bug = self.get_bug(blocker)
if version is None:
version = current_version()
if version == LATEST:
version = bug.product.latest_version
is_upstream = version == bug.product.latest_version
variants = self.get_bug_variants(bug)
filtered = set([])
version_series = ".".join(str(version).split(".")[:2])
for variant in sorted(variants, key=lambda variant: variant.id):
if variant.id in ignore_bugs:
continue
if variant.version is not None and variant.version > version:
continue
if variant.release_flag is not None and version.is_in_series(variant.release_flag):
logger.info('Found matching bug for %d by release - #%d', bug.id, variant.id)
filtered.clear()
filtered.add(variant)
break
elif is_upstream and variant.release_flag == 'future':
# It is an upstream bug
logger.info('Found a matching upstream bug #%d for bug #%d', variant.id, bug.id)
return variant
elif (isinstance(variant.version, Version) and
isinstance(variant.target_release, Version) and
(variant.version.is_in_series(version_series) or
variant.target_release.is_in_series(version_series))):
filtered.add(variant)
else:
logger.warning(
"ATTENTION!!: No release flags, wrong versions, ignoring %s", variant.id)
if not filtered:
# No appropriate bug was found
for forced_stream in force_block_streams:
# Find out if we force this bug.
if version.is_in_series(forced_stream):
return bug
else:
# No bug, yipee :)
return None
# First, use versions
for bug in filtered:
if (isinstance(bug.version, Version) and
isinstance(bug.target_release, Version) and
check_fixed_in(bug.fixed_in, version_series) and
(bug.version.is_in_series(version_series) or
bug.target_release.is_in_series(version_series))):
return bug
# Otherwise prefer release_flag
for bug in filtered:
if bug.release_flag and version.is_in_series(bug.release_flag):
return bug
return None
[docs]def check_fixed_in(fixed_in, version_series):
# used to check if the bug belongs to that series
if fixed_in is None:
return True
if not isinstance(fixed_in, Version):
fixed_in = Version(fixed_in)
return fixed_in.is_in_series(version_series)
[docs]class BugWrapper(object):
_copy_matchers = map(re.compile, [
r'^[+]{3}\s*This bug is a CFME zstream clone. The original bug is:\s*[+]{3}\n[+]{3}\s*'
'https://bugzilla.redhat.com/show_bug.cgi\?id=(\d+)\.\s*[+]{3}',
r"^\+\+\+ This bug was initially created as a clone of Bug #([0-9]+) \+\+\+"
])
def __init__(self, bugzilla, bug):
self._bug = bug
self._bugzilla = bugzilla
@property
def loose(self):
return self._bugzilla.loose
@property
def bugzilla(self):
return self._bugzilla
[docs] def __getattr__(self, attr):
"""This proxies the attribute queries to the Bug object and modifies its result.
If the field looked up is specified as loose field, it will be converted to Version.
If the field is string and it has zero length, or the value is specified as "not specified",
it will return None.
"""
value = getattr(self._bug, attr)
if attr in self.loose:
if isinstance(value, Sequence) and not isinstance(value, six.string_types):
value = value[0]
value = value.strip()
if not value:
return None
if value.lower() in NONE_FIELDS:
return None
# We have to strip any leading non-number characters to correctly match
value = re.sub(r"^[^0-9]+", "", value)
if not value:
return None
return Version(value)
if isinstance(value, six.string_types):
if len(value.strip()) == 0:
return None
else:
return value
else:
return value
@property
def qa_whiteboard(self):
"""Returns a set of QA Whiteboard markers.
It relies on the fact, that our QA Whiteboard uses format foo:bar:baz.
Should be able to handle cases like 'foo::bar', or 'abc:'.
"""
return {x.strip() for x in self._bug.qa_whiteboard.strip().split(":") if x.strip()}
@property
def copy_of(self):
"""Returns either id of the bug this is copy of, or None, if it is not a copy."""
try:
first_comment = self._bug.comments[0]["text"].lstrip()
except IndexError:
return None
for copy_matcher in self._copy_matchers:
copy_match = copy_matcher.match(first_comment)
if copy_match is not None:
return int(copy_match.groups()[0])
else:
return None
@property
def copies(self):
"""Returns list of copies of this bug."""
result = []
for bug_id in self._bug.blocks:
bug = self._bugzilla.get_bug(bug_id)
if bug.copy_of == self._bug.id:
result.append(bug_id)
return map(int, result)
@property
def _release_flag_data(self):
for flag in self.flags:
if flag["name"].startswith("cfme-"):
release_flag = flag["name"].split("-", 1)[-1]
if release_flag.endswith(".z"):
return release_flag.rsplit(".", 1)[0], True
else:
return release_flag, False
else:
return None, False
@property
def release_flag(self):
return self._release_flag_data[0]
@property
def zstream(self):
return self._release_flag_data[1]
@property
def is_opened(self):
states = self._bugzilla.open_states
if not self.upstream_bug and appliance_is_downstream():
states.add('POST')
states.add('MODIFIED')
return self.status in states
@property
def product(self):
return self._bugzilla.product(self._bug.product)
@property
def upstream_bug(self):
if self.version is None:
return True
return self.version >= self.product.latest_version
@property
def can_test_on_upstream(self):
change_states = {"POST", "MODIFIED"}
# With these states, the change is in upstream
if self.status not in {"POST", "MODIFIED", "ON_QA", "VERIFIED", "RELEASE_PENDING"}:
return False
history = self.get_history_raw()["bugs"][0]["history"]
changes = []
# We look for status changes in the history
for event in history:
for change in event["changes"]:
if change["field_name"].lower() != "status":
continue
if change["added"] in change_states:
changes.append(event["when"])
return event["when"] < appliance_build_datetime()
else:
return False
def __repr__(self):
return repr(self._bug)
def __str__(self):
return str(self._bug)