# -*- coding: utf-8 -*-
"""Library for event testing.
"""
from time import sleep
from threading import Thread, Event as ThreadEvent
from cfme.utils.log import create_sublogger
from manageiq_client.filters import Q
logger = create_sublogger('events')
[docs]class EventAttr(object):
""" EventAttr helps to compare event attributes with specific method.
Contains one event attribute and the method for comparing it.
"""
def __init__(self, attr_type=None, cmp_func=None, **attrs):
if len(attrs) > 1:
raise ValueError('event attribute can have only one key=value pair')
self.name, self.value = attrs.items()[0]
self.type = attr_type or type(self.value)
self.cmp_func = cmp_func
[docs] def match(self, attr):
""" Compares current attribute with passed attribute."""
if not isinstance(attr, EventAttr) or self.name != attr.name:
raise ValueError('Incorrect attribute is passed')
if not attr.value or not self.value:
return attr.value is None and self.value is None
elif self.cmp_func:
return self.cmp_func(self.value, attr.value)
else:
return self.value == attr.value
def __repr__(self):
return "{name}({type})={val}, cmp_func {cmp}".format(name=self.name, type=self.type,
val=self.value, cmp=self.cmp_func)
[docs]class Event(object):
""" Event represents either event received by REST API or an expected event.
:var TARGET_TYPES: Mapping of object types to REST API collections.
"""
TARGET_TYPES = {
# target_type: target_rest
'VmOrTemplate': 'vms',
'Host': 'hosts',
'Service': 'services',
}
def __init__(self, appliance, *args):
self._appliance = appliance
self.event_attrs = {} # container for EventAttr objects
for arg in args:
if isinstance(arg, EventAttr):
self.add_attrs(arg)
else:
logger.warning("arg {} doesn't belong to EventAttr. ignoring it".format(arg))
def __repr__(self):
params = ", ".join(["{}={}".format(attr.name, attr.value) for attr in
self.event_attrs.values()])
return "BaseEvent({})".format(params)
[docs] def process_id(self):
""" Resolves target_id by target_type and target name."""
if 'target_name' in self.event_attrs and 'target_id' not in self.event_attrs:
try:
target_type = self.event_attrs['target_type'].value
target_name = self.event_attrs['target_name'].value
# Target type should be present in TARGET_TYPES
if target_type not in self.TARGET_TYPES:
raise TypeError(
'Type {} is not specified in the TARGET_TYPES.'.format(target_type))
target_rest = self.TARGET_TYPES[target_type]
target_collection = getattr(self._appliance.rest_api.collections, target_rest)
o = target_collection.filter(Q('name', '=', target_name))
if not o.resources:
raise ValueError('{} with name {} not found.'.format(target_type, target_name))
# Set target_id if target object was found
self.event_attrs['target_id'] = EventAttr(**{'target_id': o[0].id})
except ValueError:
# Target isn't added yet. Need to wait
sleep(1)
[docs] def matches(self, evt):
""" Compares common attributes of expected event and passed event."""
common_attrs = set(self.event_attrs).intersection(set(evt.event_attrs))
for attr in common_attrs:
if not self.event_attrs[attr].match(evt.event_attrs[attr]):
return False
else:
return True
[docs] def add_attrs(self, *attrs):
""" Adds an EventAttr to event."""
for attr in attrs:
self.event_attrs[attr.name] = attr
return self
[docs] def build_from_entity(self, event_entity):
""" Builds Event object from event Entity"""
for key, value in event_entity['_data'].items():
self.add_attrs(EventAttr(**{key: value}))
return self
[docs]class RestEventListener(Thread):
""" EventListener accepts "expected" events, listens to db events and compares matched events
with expected events. Runs callback function if expected events have it.
:var FILTER_ATTRS: List of filters used in REST API call
"""
FILTER_ATTRS = ['event_type', 'target_type', 'target_id', 'source']
def __init__(self, appliance):
super(RestEventListener, self).__init__()
self._appliance = appliance
self._events_to_listen = []
self._last_processed_id = 0 # this is used to filter out old or processed events
self._stop_event = ThreadEvent()
self.event_streams = appliance.rest_api.collections.event_streams
[docs] def set_last_record(self):
""" Sets last_processed_id to the latest event."""
last_event_stream = self.event_streams.query_string(limit=1,
sort_order='desc',
sort_by='id')
if len(last_event_stream):
self._last_processed_id = last_event_stream[0].id
[docs] def new_event(self, *attrs, **kwattrs):
""" This method simplifies "expected" event creation.
Usage:
listener = appliance.event_listener()
evt = listener.new_event(target_type='VmOrTemplate',
target_name='my_lovely_vm',
event_type='vm_create')
listener.listen_to(evt)
"""
event = Event(self._appliance)
for name, value in kwattrs.items():
event.add_attrs(EventAttr(**{name: value}))
for attr in attrs:
event.add_attrs(EventAttr(**attr))
return event
[docs] def listen_to(self, *evts, **kwargs):
""" Adds expected events to EventListener
May accept one or many events.
Callback function will is called when expected event has arrived in event_streams.
Callback will receive expected event and got event as params.
Args:
evts: list of events which EventListener should listen to
callback: callback function that will be called if event is received
first_event: EventListener will skip processing event if it has been occurred once.
By default EventListener collects and receives all matching events.
"""
callback = kwargs.get('callback')
first_event = bool(kwargs.get('first_event'))
for evt in evts:
if isinstance(evt, Event):
exp_event = {'event': evt,
'callback': callback,
'matched_events': [],
'first_event': first_event}
self._events_to_listen.append(exp_event)
logger.info("event {} is added to listening queue.".format(evt))
else:
raise ValueError("one of events doesn't belong to Event class")
[docs] def start(self):
self.set_last_record()
self._stop_event.clear()
super(RestEventListener, self).start()
logger.info('Event Listener has been started')
[docs] def stop(self):
self._stop_event.set()
logger.info('Event Listener has been stopped')
@property
def started(self):
return super(RestEventListener, self).is_alive()
[docs] def run(self):
""" Overrides ThreadEvent run to continuously process events"""
self.process_events()
[docs] def process_events(self):
""" Processes all new events and compares them with expected events.
Processed events are ignored next time.
"""
while not self._stop_event.is_set():
sleep(1)
for exp_event in self._events_to_listen:
# Skip if event has occurred
if exp_event['first_event'] and len(exp_event['matched_events']):
continue
matched_events = self.get_next_portion(exp_event['event'])
if not matched_events:
continue
# Match events
try:
for event_entity in matched_events:
got_event = Event(self._appliance).build_from_entity(event_entity)
if exp_event['event'].matches(got_event):
if exp_event['callback']:
exp_event['callback'](exp_event=exp_event['event'],
got_event=got_event)
exp_event['matched_events'].append(got_event)
self._last_processed_id = got_event.event_attrs['id'].value
except Exception:
logger.exception("An exception during matching events occurred.")
if self._stop_event.is_set():
break
[docs] def get_next_portion(self, evt):
""" Returns list with one or more events matched with expected event.
Returns None if there is no matched events."""
evt.process_id()
q = Q('id', '>', self._last_processed_id) # ensure we get only new events
used_filters = set(self.FILTER_ATTRS).intersection(set(evt.event_attrs))
for filter_attr in used_filters:
evt_attr = evt.event_attrs[filter_attr]
if evt_attr.value:
q &= Q(filter_attr, '=', evt_attr.value)
result = self.event_streams.filter(q)
if len(result):
return result
@property
def got_events(self):
""" Returns dict with expected events and all the events matched to expected ones."""
evts = [(evt['event'], len(evt['matched_events'])) for evt in self._events_to_listen]
logger.info(evts)
return self._events_to_listen
[docs] def reset_events(self):
self._events_to_listen = []
[docs] def check_expected_events(self):
""" Checks that all expected events has arrived."""
return all([len(event['matched_events']) for event in self.got_events])
[docs] def __call__(self, *args, **kwargs):
"""
it is called by register_event fixture.
bad idea, to replace register_event by object later
"""
if 'first_event' in kwargs:
first_event = kwargs.pop('first_event')
else:
first_event = True
evt = self.new_event(*args, **kwargs)
logger.info("registering event: {}".format(evt))
self.listen_to(evt, callback=None, first_event=first_event)