# -*- coding: utf-8 -*-
"""Library for event testing.
"""
from cached_property import cached_property
from contextlib import contextmanager
from collections import Iterable
from datetime import datetime
from numbers import Number
from sqlalchemy.sql.expression import func
from time import sleep
from threading import Thread, Event as ThreadEvent
from cfme.utils.log import create_sublogger
logger = create_sublogger('events')
[docs]class EventAttr(object):
"""
contains one event attribute and the method for comparing it.
"""
def __init__(self, attr_type=None, cmp_func=None, **attrs):
if len(attrs) > 1:
raise ValueError('event attribute can have only one key=value pair')
self.name, self.value = attrs.items()[0]
self.type = attr_type or type(self.value)
self.cmp_func = cmp_func
[docs] def match(self, attr):
"""
compares current attribute with passed attribute
"""
if not isinstance(attr, EventAttr) or self.name != attr.name:
raise ValueError('Incorrect attribute is passed')
if not attr.value or not self.value:
return attr.value is None and self.value is None
elif self.cmp_func:
return self.cmp_func(self.value, attr.value)
else:
return self.value == attr.value
def __repr__(self):
return "{name}({type})={val}, cmp_func {cmp}".format(name=self.name, type=self.type,
val=self.value, cmp=self.cmp_func)
# fixme: would it be better to create event prototype and just clone it ?
[docs]class Event(object):
"""
represents either db event received by CFME and stored in event_streams or an expected event
"""
def __init__(self, event_tool, *args):
self._tool = event_tool
# filling obtaining default attributes and their types
self._default_attrs = {} # EventAttr obj
self._populate_defaults()
# container for event attributes
self.event_attrs = {} # EventAttr obj
for arg in args:
if isinstance(arg, EventAttr):
self.add_attrs(arg)
else:
logger.warning("arg {} doesn't belong to EventAttr. ignoring it".format(arg))
def __repr__(self):
params = ", ".join(["{}={}".format(attr.name, attr.value) for attr in
self.event_attrs.values()])
return "BaseEvent({})".format(params)
def _populate_defaults(self):
for attr_name, attr_type in self._tool.event_streams_attributes:
self._default_attrs[attr_name] = EventAttr(**{attr_name: None, 'attr_type': attr_type})
def _parse_raw_event(self, evt):
for attr in self._default_attrs:
default_type = self._default_attrs[attr].type
evt_value = getattr(evt, attr)
evt_type = type(evt_value)
# weird thing happens here. getattr sometimes takes value not equal to python_type
# so, force type conversion has to be done
if evt_value and evt_type is not default_type:
if evt_type is unicode:
evt_value = evt_value.encode('utf8')
else:
evt_value = default_type(evt_value)
self.add_attrs(EventAttr(**{attr: evt_value}))
def _is_raw_event(self, evt):
return evt.__tablename__ == 'event_streams'
[docs] def matches(self, evt):
"""
compares current event with passed event.
"""
if not isinstance(evt, type(self)):
raise ValueError("passed event doesn't belong to {}".format(type(self)))
# checking only common attributes
if 'target_name' in self.event_attrs and 'target_id' not in self.event_attrs:
try:
target_id = self._tool.process_id(self.event_attrs['target_type'].value,
self.event_attrs['target_name'].value)
self.event_attrs['target_id'] = EventAttr(**{'target_id': target_id})
except ValueError:
# vm or host name isn't added to db yet. need to wait
return False
common_attrs = set(self.event_attrs).intersection(set(evt.event_attrs))
for attr in common_attrs:
if not self.event_attrs[attr].match(evt.event_attrs[attr]):
return False
else:
return True
[docs] def add_attrs(self, *attrs):
"""
event consists of attributes like event_type, etc.
this method allows to add an attribute to event
"""
if isinstance(attrs, Iterable):
for attr in attrs:
if attr.name == 'target_name':
# this is artificial attr which will be converted to target_id during matching
self.event_attrs[attr.name] = attr
elif attr.name in self._default_attrs:
# type check was removed because sqlalchemy's python_type
# and type of returned values are different
self.event_attrs[attr.name] = attr
else:
logger.warning('The attribute {} type {} is absent in DB '
'or type mismatch.'.format(attr.name, attr.type))
else:
raise ValueError("incorrect parameters are passed {}".format(attrs))
return self
[docs] def build_from_raw_event(self, evt):
"""
helper method which takes raw event from event_streams and prepares event object
"""
# checking is this param - raw event, populating fields by this data then
if self._is_raw_event(evt):
self._parse_raw_event(evt)
return self
[docs]class DbEventListener(Thread):
"""
accepts "expected" events, listens to db events and compares showed up events with expected
events. Runs callback function if expected events have it.
"""
def __init__(self, appliance):
super(DbEventListener, self).__init__()
self._appliance = appliance
self._tool = EventTool(self._appliance)
self._events_to_listen = []
# last_id is used to ignore already arrived messages the database
# When database is "cleared" the id of the last event is placed here. That is then used
# in queries to prevent events of this id and earlier to get in.
self._last_processed_id = None
self._stop_event = ThreadEvent()
[docs] def set_last_record(self, evt=None):
if evt:
self._last_processed_id = evt.event_attrs['id'].value
else:
try:
self._last_processed_id = self._tool.query(
func.max(self._tool.event_streams.id)).one()
except IndexError:
# No events yet, so do nothing
pass
[docs] def new_event(self, *attrs, **kwattrs):
"""
this method just simplifies "expected" event creation.
Usage:
listener = appliance.event_listener()
evt = listener.new_event(target_type='VmOrTemplate',
target_name='my_lovely_vm',
event_type='vm_create')
listener.listen_to(evt)
"""
event = Event(event_tool=self._tool)
for name, value in kwattrs.items():
event.add_attrs(EventAttr(**{name: value}))
for attr in attrs:
event.add_attrs(EventAttr(**attr))
return event
[docs] def listen_to(self, *evts, **kwargs):
"""
accepts one or many events
callback function will be called when event arrived in event_streams.
callback will receive expected event and got event as params.
Args:
evts: list of events which EventListener should listen to
callback: callback function that will be called if event is received
first_event: EventListener waits for only first event of such type.
it ignores such event in future if first matching event is found.
By default EventListener collects and receives all matching events.
"""
if 'callback' in kwargs:
callback = kwargs['callback']
else:
callback = None
# if first_event = True, these expected events won't be checked after first match
if 'first_event' in kwargs and kwargs['first_event']:
first_event = True
else:
first_event = False
if isinstance(evts, Iterable):
for evt in evts:
if isinstance(evt, Event):
logger.info("event {} is added to listening queue".format(evt))
self._events_to_listen.append({'event': evt,
'callback': callback,
'matched_events': [],
'first_event': first_event})
else:
raise ValueError("one of events doesn't belong to Event class")
else:
raise ValueError('incorrect is passed')
[docs] def start(self):
logger.info('Event Listener has been started')
self.set_last_record()
self._stop_event.clear()
super(DbEventListener, self).start()
[docs] def stop(self):
logger.info('Event Listener has been stopped')
self._stop_event.set()
[docs] def run(self):
self.process_events()
@property
def started(self):
return super(DbEventListener, self).is_alive()
[docs] def process_events(self):
"""
processes all new db events and compares them with expected events.
processed events are ignored next time
"""
while not self._stop_event.is_set():
events = self.get_next_portion()
if len(events) == 0:
sleep(0.2)
continue
for got_event in events:
logger.debug("processing event id {}".format(got_event.id))
got_event = Event(event_tool=self._tool).build_from_raw_event(got_event)
for exp_event in self._events_to_listen:
if exp_event['first_event'] and len(exp_event['matched_events']) > 0:
continue
if exp_event['event'].matches(got_event):
if exp_event['callback']:
exp_event['callback'](exp_event=exp_event['event'], got_event=got_event)
exp_event['matched_events'].append(got_event)
self.set_last_record(got_event)
if self._stop_event.is_set():
break
@property
def got_events(self):
"""
returns dict with expected events and all the events matched to expected ones
"""
evts = [(evt['event'], len(evt['matched_events'])) for evt in self._events_to_listen]
logger.info(evts)
return self._events_to_listen
[docs] def reset_matches(self):
for event in self._events_to_listen:
event['matched_events'] = []
[docs] def reset_events(self):
self._events_to_listen = []
[docs] def get_next_portion(self):
logger.debug("obtaining next portion of events")
return self._tool.query(self._tool.event_streams)\
.filter(self._tool.event_streams.id > self._last_processed_id)\
.order_by(self._tool.event_streams.id).yield_per(100).all()
[docs] def check_expected_events(self):
return all([len(event['matched_events']) for event in self.got_events])
[docs] def __call__(self, *args, **kwargs):
"""
it is called by register_event fixture.
bad idea, to replace register_event by object later
"""
if 'first_event' in kwargs:
first_event = kwargs.pop('first_event')
else:
first_event = True
evt = self.new_event(*args, **kwargs)
logger.info("registering event: {}".format(evt))
self.listen_to(evt, callback=None, first_event=first_event)