Source code for cfme.utils.bz

# -*- coding: utf-8 -*-
import re
from collections import Sequence
from contextlib import contextmanager

from bugzilla import Bugzilla as _Bugzilla
from bugzilla.transport import BugzillaError
from cached_property import cached_property
from miq_version import LATEST
from miq_version import Version
from yaycl import AttrDict

from cfme.utils.conf import credentials
from cfme.utils.conf import env
from cfme.utils.log import logger
from cfme.utils.version import current_version

NONE_FIELDS = {"---", "undefined", "unspecified"}


[docs]class Product(object): def __init__(self, data): self._data = data @property def default_release(self): return Version(self._data["default_release"]) @property def name(self): return self._data["name"] @property def milestones(self): return [ms["name"] for ms in self._data["milestones"]] @property def releases(self): return [release["name"] for release in self._data["releases"]] @property def versions(self): return sorted( Version(version["name"]) for version in self._data["versions"] if version["name"] not in NONE_FIELDS ) @property def latest_version(self): return self.versions[-1]
[docs]class Bugzilla(object): def __init__(self, **kwargs): # __kwargs passed to _Bugzilla instantiation, pop our args out self.__product = kwargs.pop("product", None) self.__config_options = kwargs.pop('config_options', {}) self.__kwargs = kwargs self.__bug_cache = {} self.__product_cache = {} @property def bug_count(self): return len(self.__bug_cache) @property def bugs(self): for bug in self.__bug_cache.values(): yield bug
[docs] def products(self, *names): return [Product(p) for p in self.bugzilla._proxy.Product.get({"names": names})["products"]]
[docs] def product(self, product): if product not in self.__product_cache: self.__product_cache[product] = self.products(product)[0] return self.__product_cache[product]
@property def default_product(self): if self.__product is None: return None return self.product(self.__product)
[docs] @classmethod def from_config(cls): bz_conf = env.get('bugzilla', {}) # default empty so we can call .get() later url = bz_conf.get('url') if url is None: url = 'https://bugzilla.redhat.com/xmlrpc.cgi' logger.warning("No Bugzilla URL specified in conf, using default: %s", url) cred_key = bz_conf.get("bugzilla", {}).get("credentials") return cls(url=url, user=credentials.get(cred_key, {}).get("username"), password=credentials.get(cred_key, {}).get("password"), cookiefile=None, tokenfile=None, product=bz_conf.get("bugzilla", {}).get("product"), config_options=bz_conf)
@cached_property def bugzilla(self): return _Bugzilla(**self.__kwargs) @cached_property def loose(self): return self.__config_options.get("loose", []) @cached_property def open_states(self): return set(self.__config_options.get("skip", [])) @cached_property def upstream_version(self): if self.default_product is not None: return self.default_product.latest_version else: return Version(self.__config_options.get("upstream_version", Version.latest().vstring))
[docs] def get_bug(self, id): id = int(id) if id not in self.__bug_cache: self.__bug_cache[id] = BugWrapper(self, self.bugzilla.getbug(id)) return self.__bug_cache[id]
[docs] def get_bug_variants(self, id): if isinstance(id, BugWrapper): bug = id else: bug = self.get_bug(id) expanded = set([]) found = set([]) stack = set([bug]) while stack: b = stack.pop() if b.status == "CLOSED" and b.resolution == "DUPLICATE": b = self.get_bug(b.dupe_of) found.add(b) if b.copy_of: stack.add(self.get_bug(b.copy_of)) if b not in expanded: for cp in map(self.get_bug, b.copies): found.add(cp) stack.add(cp) expanded.add(b) return found
[docs] def resolve_blocker(self, blocker, version=None, ignore_bugs=None, force_block_streams=None): # ignore_bugs is mutable but is not mutated here! Same force_block_streams force_block_streams = force_block_streams or [] ignore_bugs = set([]) if not ignore_bugs else ignore_bugs if isinstance(id, BugWrapper): bug = blocker else: bug = self.get_bug(blocker) if version is None: version = current_version() if version == LATEST: version = bug.product.latest_version is_upstream = version == bug.product.latest_version variants = self.get_bug_variants(bug) filtered = set([]) version_series = ".".join(str(version).split(".")[:2]) for variant in sorted(variants, key=lambda variant: variant.id): if variant.id in ignore_bugs: continue if variant.version is not None and variant.version > version: continue if variant.release_flag is not None and version.is_in_series(variant.release_flag): logger.info('Found matching bug for %d by release - #%d', bug.id, variant.id) filtered.clear() filtered.add(variant) break elif is_upstream and variant.release_flag == 'future': # It is an upstream bug logger.info('Found a matching upstream bug #%d for bug #%d', variant.id, bug.id) return variant elif (isinstance(variant.version, Version) and isinstance(variant.target_release, Version) and (variant.version.is_in_series(version_series) or variant.target_release.is_in_series(version_series))): filtered.add(variant) else: logger.warning( "ATTENTION!!: No release flags, wrong versions, ignoring %s", variant.id) if not filtered: # No appropriate bug was found for forced_stream in force_block_streams: # Find out if we force this bug. if version.is_in_series(forced_stream): return bug else: # No bug, yipee :) return None # First, use versions for bug in filtered: if (isinstance(bug.version, Version) and isinstance(bug.target_release, Version) and not check_fixed_in(bug.fixed_in, version_series) and (bug.version.is_in_series(version_series) or bug.target_release.is_in_series(version_series))): return bug # Otherwise prefer release_flag for bug in filtered: if bug.release_flag and version.is_in_series(bug.release_flag): return bug return None
[docs] @contextmanager def logged_into_bugzilla(self): bz_creds = credentials.get("bugzilla", None) # login to bzapi if not bz_creds: # error out if there are no creds available in yamls raise BugzillaError("No creds available to log into Bugzilla") try: yield self.bugzilla.login(bz_creds.get("username"), bz_creds.get("password")) except BugzillaError: logger.exception("Unable to login to Bugzilla with those creds.") raise else: self.bugzilla.logout()
[docs] def set_flags(self, idlist, flags): # set the flags with self.logged_into_bugzilla(): for bug in self.bugzilla.getbugs(idlist): result = bug.updateflags(flags) logger.info("Got %s from updating %s", result, bug)
[docs] def get_bz_info(self, idlist): """ Get information about the BZs in idlist """ logger.info("Getting information about the following BZ's: %s", idlist) # build info with self.logged_into_bugzilla(): info = {} for bug_id, bug in zip(idlist, self.bugzilla.getbugs(idlist)): # assign some attrs for each BZ info[bug_id] = AttrDict() info[bug_id].description = bug.description info[bug_id].summary = bug.summary info[bug_id].flags = bug.flags info[bug_id].qa_contact = bug.qa_contact info[bug_id].is_open = bug.is_open info[bug_id].status = bug.status info[bug_id].keywords = bug.keywords info[bug_id].blocks = bug.blocks return info
[docs]def check_fixed_in(fixed_in, version_series): # used to check if the bug belongs to that series if fixed_in is None: return False if not isinstance(fixed_in, Version): fixed_in = Version(fixed_in) return fixed_in.is_in_series(version_series)
[docs]class BugWrapper(object): _copy_matchers = list(map(re.compile, [ r'^[+]{3}\s*This bug is a CFME zstream clone. The original bug is:\s*[+]{3}\n[+]{3}\s*' r'https://bugzilla.redhat.com/show_bug.cgi\?id=(\d+)\.\s*[+]{3}', r"^\+\+\+ This bug was initially created as a clone of Bug #([0-9]+) \+\+\+" ])) def __init__(self, bugzilla, bug): self._bug = bug self._bugzilla = bugzilla @property def loose(self): return self._bugzilla.loose @property def bugzilla(self): return self._bugzilla def __getattr__(self, attr): """This proxies the attribute queries to the Bug object and modifies its result. If the field looked up is specified as loose field, it will be converted to Version. If the field is string and it has zero length, or the value is specified as "not specified", it will return None. """ value = getattr(self._bug, attr) if attr in self.loose: if isinstance(value, Sequence) and not isinstance(value, str): value = value[0] value = value.strip() if not value: return None if value.lower() in NONE_FIELDS: return None # We have to strip any leading non-number characters to correctly match value = re.sub(r"^[^0-9]+", "", value) if not value: return None return Version(value) if isinstance(value, str): if len(value.strip()) == 0: return None else: return value else: return value @property def qa_whiteboard(self): """Returns a set of QA Whiteboard markers. It relies on the fact, that our QA Whiteboard uses format foo:bar:baz. Should be able to handle cases like 'foo::bar', or 'abc:'. """ return {x.strip() for x in self._bug.qa_whiteboard.strip().split(":") if x.strip()} @property def copy_of(self): """Returns either id of the bug this is copy of, or None, if it is not a copy.""" try: first_comment = self._bug.comments[0]["text"].lstrip() except IndexError: return None for copy_matcher in self._copy_matchers: copy_match = copy_matcher.match(first_comment) if copy_match is not None: return int(copy_match.groups()[0]) else: return None @property def copies(self): """Returns list of copies of this bug.""" result = [] for bug_id in self._bug.blocks: bug = self._bugzilla.get_bug(bug_id) if bug.copy_of == self._bug.id: result.append(bug_id) return list(map(int, result)) @property def _release_flag_data(self): for flag in self.flags: if flag["name"].startswith("cfme-"): release_flag = flag["name"].split("-", 1)[-1] if release_flag.endswith(".z"): return release_flag.rsplit(".", 1)[0], True else: return release_flag, False else: return None, False @property def release_flag(self): return self._release_flag_data[0] @property def zstream(self): return self._release_flag_data[1] @property def is_opened(self): states = self._bugzilla.open_states # we consider "POST" and "MODIFIED" to still be open states states.add('POST') states.add('MODIFIED') return self.status in states @property def product(self): return self._bugzilla.product(self._bug.product) @property def upstream_bug(self): if self.version is None: return True return self.version >= self.product.latest_version @property def can_test_on_upstream(self): change_states = {"POST", "MODIFIED"} return self.status in change_states def __repr__(self): return repr(self._bug) def __str__(self): return str(self._bug)