Source code for cfme.configure.tasks

# -*- coding: utf-8 -*-

""" Module dealing with Configure/Tasks section.
"""
import re
from navmazing import NavigateToAttribute
from widgetastic.utils import Version, VersionPick
from widgetastic.widget import View
from widgetastic_patternfly import Dropdown, Tab

from cfme.base.login import BaseLoggedInPage
from cfme.exceptions import TaskFailedException
from cfme.utils.appliance import Navigatable
from cfme.utils.appliance.implementations.ui import navigator, CFMENavigateStep, navigate_to
from cfme.utils.log import logger
from cfme.utils.wait import wait_for, TimedOutError
from widgetastic_manageiq import BootstrapSelect, Button, CheckboxSelect, Table


table_loc = '//div[@id="gtl_div"]//table'

TABS_DATA_PER_PROVIDER = {
    'container': {
        'tab': 'AllTasks',
        'task': '{}',
        'state': 'finished'
    },
    'vm': {
        'tab': 'AllTasks',
        'task': 'Scan from Vm {}',
        'state': 'finished'
    },
    'host': {
        'tab': 'MyOtherTasks',
        'task': "SmartState Analysis for '{}'",
        'state': 'Finished'
    },
    'datastore': {
        'tab': 'MyOtherTasks',
        'task': 'SmartState Analysis for \[{}\]',
        'state': "Finished"
    },
    'cluster': {
        'tab': 'MyOtherTasks',
        'task': 'SmartState Analysis for \[{}\]',
        'state': "Finished"}
}


[docs]def is_vm_analysis_finished(name, **kwargs): return is_analysis_finished(name=name, task_type='vm', **kwargs)
[docs]def is_host_analysis_finished(name, **kwargs): return is_analysis_finished(name=name, task_type='host', **kwargs)
[docs]def is_datastore_analysis_finished(name, **kwargs): return is_analysis_finished(name=name, task_type='datastore', **kwargs)
[docs]def is_cluster_analysis_finished(name, **kwargs): return is_analysis_finished(name=name, task_type='cluster', **kwargs)
[docs]def delete_all_tasks(destination): view = navigate_to(Tasks, destination) view.delete.item_select('Delete All', handle_alert=True)
[docs]def is_analysis_finished(name, task_type='vm', clear_tasks_after_success=True): """ Check if analysis is finished - if not, reload page""" return check_tasks_have_no_errors(name, task_type, expected_num_of_tasks=1, silent_failure=False, clear_tasks_after_success=clear_tasks_after_success)
[docs]def are_all_tasks_match_status(name, expected_num_of_tasks, task_type): """ Check if all tasks states are finished - if not, reload page""" tabs_data = TABS_DATA_PER_PROVIDER[task_type] return all_tasks_match_status( destination=tabs_data['tab'], task_name=tabs_data['task'].format(name), expected_status=tabs_data['state'], expected_num_of_tasks=expected_num_of_tasks )
[docs]def all_tasks_match_status(destination, task_name, expected_status, expected_num_of_tasks): """ Check if all tasks with same task name states are finished - if not, reload page""" view = navigate_to(Tasks, destination) tab_view = getattr(view.tabs, destination.lower()) # task_name change from str to support also regular expression pattern task_name = re.compile(task_name) # expected_status change from str to support also regular expression pattern expected_status = re.compile(expected_status, re.IGNORECASE) try: rows = list(tab_view.table.rows(task_name=task_name, state=expected_status)) except IndexError: logger.warn('IndexError exception suppressed when searching for task row, no match found.') return False # check state = finished for all tasks return expected_num_of_tasks == len(rows), len(rows)
[docs]def check_tasks_have_no_errors(task_name, task_type, expected_num_of_tasks, silent_failure=False, clear_tasks_after_success=False): """ Check if all tasks analysis match state with no errors""" tabs_data = TABS_DATA_PER_PROVIDER[task_type] destination = tabs_data['tab'] task_name = tabs_data['task'].format(task_name) expected_status = tabs_data['state'] view = navigate_to(Tasks, destination) tab_view = getattr(view.tabs, destination.lower()) # task_name change from str to support also regular expression pattern task_name = re.compile(task_name) # expected_status change from str to support also regular expression pattern expected_status = re.compile(expected_status, re.IGNORECASE) wait_for( lambda: tab_view.table.is_displayed, timeout=10, delay=2, fail_func=view.reload.click ) try: rows = list(tab_view.table.rows(task_name=task_name, state=expected_status)) except IndexError: logger.warn('IndexError exception suppressed when searching for task row, no match found.') return False # check state for all tasks if expected_num_of_tasks != len(rows): logger.warn('There is no match between expected number of tasks "{}",' ' and number of tasks on state "{}'.format(expected_num_of_tasks, expected_status)) return False # throw exception if error in message for row in rows: message = row.message.text.lower() for term in ('error', 'timed out', 'failed', 'unable to run openscap'): if term in message: if silent_failure: logger.warning("Task {} error: {}".format(row.task_name.text, message)) return False elif term == 'timed out': raise TimedOutError("Task {} timed out: {}".format(row.task_name.text, message)) else: raise TaskFailedException(task_name=row.task_name.text, message=message) if clear_tasks_after_success: # Remove all finished tasks so they wouldn't poison other tests delete_all_tasks(destination) return True
[docs]def wait_analysis_finished_multiple_tasks( task_name, task_type, expected_num_of_tasks, delay=5, timeout='5M'): """ Wait until analysis is finished (or timeout exceeded)""" row_completed = [] # get view for reload button view = navigate_to(Tasks, 'AllTasks') def tasks_finished(output_rows, task_name, task_type, expected_num_of_tasks): is_succeed, num_of_succeed_tasks = are_all_tasks_match_status( task_name, expected_num_of_tasks, task_type) output_rows.append(num_of_succeed_tasks) return is_succeed try: wait_for(tasks_finished, func_kwargs={'output_rows': row_completed, 'task_name': task_name, 'task_type': task_type, 'expected_num_of_tasks': expected_num_of_tasks}, delay=delay, timeout=timeout, fail_func=view.reload.click) return row_completed[-1] except TimedOutError as e: logger.error("Only {} Tasks out of {}, Finished".format(row_completed[-1], expected_num_of_tasks)) raise TimedOutError('exception {}'.format(e))
[docs]class TasksView(BaseLoggedInPage): # Toolbar delete = Dropdown('Delete Tasks') # dropdown just has icon, use element title reload = Button(title=VersionPick({Version.lowest(): 'Reload the current display', '5.9': 'Refresh this page'})) @View.nested class tabs(View): # noqa # Extra Toolbar # Only on 'All' type tabs, but for access it doesn't make sense to access the tab for a # toolbar button cancel = Button(title='Cancel the selected task') # Form Buttons apply = Button('Apply') reset = Button('Reset') default = Button('Default') # Filters zone = BootstrapSelect(id='chosen_zone') period = BootstrapSelect(id='time_period') user = BootstrapSelect(id='user_choice') # This checkbox search_root captures all the filter options # It will break for status if/when there is second checkbox selection field added # It's the lowest level div with an id that captures the status checkboxes status = CheckboxSelect(search_root='tasks_options_div') state = BootstrapSelect(id='state_choice') @View.nested class mytasks(Tab): # noqa TAB_NAME = VersionPick({Version.lowest(): 'My VM and Container Analysis Tasks', '5.9': 'My Tasks'}) table = Table(table_loc) @View.nested class myothertasks(Tab): # noqa TAB_NAME = VersionPick({'5.9': 'My Tasks', Version.lowest(): 'My Other UI Tasks'}) table = Table(table_loc) @View.nested class alltasks(Tab): # noqa TAB_NAME = VersionPick({'5.9': 'All Tasks', Version.lowest(): "All VM and Container Analysis Tasks"}) table = Table(table_loc) @View.nested class allothertasks(Tab): # noqa TAB_NAME = "All Other Tasks" table = Table(table_loc) @property def is_displayed(self): return ( self.tabs.mytasks.is_displayed and self.tabs.myothertasks.is_displayed and self.tabs.alltasks.is_displayed and self.tabs.allothertasks.is_displayed)
[docs]class Tasks(Navigatable): pass
@navigator.register(Tasks, 'MyTasks')
[docs]class MyTasks(CFMENavigateStep): VIEW = TasksView prerequisite = NavigateToAttribute('appliance.server', 'Tasks')
[docs] def step(self, *args, **kwargs): self.view.tabs.mytasks.select()
[docs] def am_i_here(self): tasks = self.view.tabs.mytasks return tasks.is_displayed and tasks.is_active()
@navigator.register(Tasks, 'MyOtherTasks')
[docs]class MyOtherTasks(CFMENavigateStep): VIEW = TasksView prerequisite = NavigateToAttribute('appliance.server', 'Tasks')
[docs] def step(self, *args, **kwargs): self.view.tabs.myothertasks.select()
[docs] def am_i_here(self): tasks = self.view.tabs.myothertasks return tasks.is_displayed and tasks.is_active()
@navigator.register(Tasks, 'AllTasks')
[docs]class AllTasks(CFMENavigateStep): VIEW = TasksView prerequisite = NavigateToAttribute('appliance.server', 'Tasks')
[docs] def step(self, *args, **kwargs): self.view.tabs.alltasks.select()
[docs] def am_i_here(self): tasks = self.view.tabs.alltasks return tasks.is_displayed and tasks.is_active()
@navigator.register(Tasks, 'AllOtherTasks')
[docs]class AllOtherTasks(CFMENavigateStep): VIEW = TasksView prerequisite = NavigateToAttribute('appliance.server', 'Tasks')
[docs] def step(self, *args, **kwargs): self.view.tabs.allothertasks.select()
[docs] def am_i_here(self): tasks = self.view.tabs.allothertasks return tasks.is_displayed and tasks.is_active()