Source code for cfme.containers.provider.openshift

import attr
from cached_property import cached_property
from os import path
from six.moves.urllib.error import URLError
from wrapanapi.systems.container import Openshift

from cfme.common.provider import DefaultEndpoint
from cfme.control.explorer.alert_profiles import ProviderAlertProfile, NodeAlertProfile
from cfme.utils import ssh
from cfme.utils.log import logger
from cfme.utils.appliance.implementations.ui import navigate_to
from cfme.utils.ocp_cli import OcpCli
from cfme.utils.varmeth import variable
from cfme.utils.wait import wait_for, TimedOutError
from . import ContainersProvider, ContainersProviderDefaultEndpoint, ContainersProviderEndpointsForm


[docs]class CustomAttribute(object): def __init__(self, name, value, field_type=None, href=None): self.name = name self.value = value self.field_type = field_type self.href = href
[docs]class OpenshiftDefaultEndpoint(ContainersProviderDefaultEndpoint): """Represents Openshift default endpoint""" @staticmethod
[docs] def get_ca_cert(connection_info): """Getting OpenShift's certificate from the master machine. Args: connection_info (dict): username, password and hostname for OCP returns: certificate's content. """ with ssh.SSHClient(**connection_info) as provider_ssh: _, stdout, _ = provider_ssh.exec_command("cat /etc/origin/master/ca.crt") return str("".join(stdout.readlines()))
[docs]class ServiceBasedEndpoint(DefaultEndpoint): @property def view_value_mapping(self): out = {'hostname': self.hostname, 'api_port': self.api_port, 'sec_protocol': self.sec_protocol} if out['sec_protocol'] and self.sec_protocol.lower() == 'ssl trusting custom ca': out['trusted_ca_certificates'] = OpenshiftDefaultEndpoint.get_ca_cert( {"username": self.ssh_creds.principal, "password": self.ssh_creds.secret, "hostname": self.master_hostname}) return out
[docs]class VirtualizationEndpoint(ServiceBasedEndpoint): """Represents virtualization Endpoint""" name = 'virtualization' @property def view_value_mapping(self): # values like host, port are taken from Default endpoint # and not editable in Virtualization endpoint, only token can be added return {'kubevirt_token': self.token}
[docs]class MetricsEndpoint(ServiceBasedEndpoint): """Represents metrics Endpoint""" name = 'metrics'
[docs]class AlertsEndpoint(ServiceBasedEndpoint): """Represents Alerts Endpoint""" name = 'alerts'
@attr.s(hash=False)
[docs]class OpenshiftProvider(ContainersProvider): num_route = ['num_route'] STATS_TO_MATCH = ContainersProvider.STATS_TO_MATCH + num_route type_name = "openshift" mgmt_class = Openshift db_types = ["Openshift::ContainerManager"] endpoints_form = ContainersProviderEndpointsForm settings_key = 'ems_openshift' http_proxy = attr.ib(default=None) adv_http = attr.ib(default=None) adv_https = attr.ib(default=None) no_proxy = attr.ib(default=None) image_repo = attr.ib(default=None) image_reg = attr.ib(default=None) image_tag = attr.ib(default=None) cve_loc = attr.ib(default=None) virt_type = attr.ib(default=None)
[docs] def create(self, **kwargs): # Enable alerts collection before adding the provider to avoid missing active # alert after adding the provider # For more info: https://bugzilla.redhat.com/show_bug.cgi?id=1514950 if self.appliance.version >= '5.9' and getattr(self, "alerts_type") == "Prometheus": alert_profiles = self.appliance.collections.alert_profiles provider_profile = alert_profiles.instantiate(ProviderAlertProfile, "Prometheus Provider Profile") node_profile = alert_profiles.instantiate(NodeAlertProfile, "Prometheus node Profile") for profile in [provider_profile, node_profile]: profile.assign_to("The Enterprise") super(OpenshiftProvider, self).create(**kwargs)
@cached_property def cli(self): return OcpCli(self)
[docs] def href(self): return self.appliance.rest_api.collections.providers\ .find_by(name=self.name).resources[0].href
@property def view_value_mapping(self): mapping = {'name': self.name, 'zone': self.zone, 'prov_type': ('OpenShift Container Platform' if self.appliance.is_downstream else 'OpenShift')} if self.appliance.version >= '5.9': mapping['metrics_type'] = self.metrics_type mapping['alerts_type'] = self.alerts_type mapping['proxy'] = { 'http_proxy': self.http_proxy } mapping['advanced'] = { 'adv_http': self.adv_http, 'adv_https': self.adv_https, 'no_proxy': self.no_proxy, 'image_repo': self.image_repo, 'image_reg': self.image_reg, 'image_tag': self.image_tag, 'cve_loc': self.cve_loc } if self.appliance.version >= '5.10': mapping['virt_type'] = self.virt_type else: mapping['metrics_type'] = None mapping['alerts_type'] = None mapping['virt_type'] = None mapping['proxy'] = None mapping['advanced'] = None return mapping @variable(alias='db') def num_route(self): return self._num_db_generic('container_routes') @num_route.variant('ui') def num_route_ui(self): view = navigate_to(self, "Details") return int(view.entities.summary("Relationships").get_text_of("Routes")) @variable(alias='db') def num_template(self): return self._num_db_generic('container_templates') @num_template.variant('ui') def num_template_ui(self): view = navigate_to(self, "Details") return int(view.entities.summary("Relationships").get_text_of("Container Templates")) @classmethod
[docs] def from_config(cls, prov_config, prov_key): endpoints = {} token_creds = cls.process_credential_yaml_key(prov_config['credentials'], cred_type='token') master_hostname = prov_config['endpoints']['default'].hostname ssh_creds = cls.process_credential_yaml_key(prov_config['ssh_creds']) for endp in prov_config['endpoints']: # Add ssh_password for each endpoint, so get_ca_cert # will be able to get SSL cert form OCP for each endpoint setattr(prov_config['endpoints'][endp], "master_hostname", master_hostname) setattr(prov_config['endpoints'][endp], "ssh_creds", ssh_creds) if OpenshiftDefaultEndpoint.name == endp: prov_config['endpoints'][endp]['token'] = token_creds.token endpoints[endp] = OpenshiftDefaultEndpoint(**prov_config['endpoints'][endp]) elif MetricsEndpoint.name == endp: endpoints[endp] = MetricsEndpoint(**prov_config['endpoints'][endp]) elif AlertsEndpoint.name == endp: endpoints[endp] = AlertsEndpoint(**prov_config['endpoints'][endp]) else: raise Exception('Unsupported endpoint type "{}".'.format(endp)) settings = prov_config.get('settings', {}) advanced = settings.get('advanced', {}) http_proxy = settings.get('proxy', {}).get('http_proxy') adv_http, adv_https, no_proxy, image_repo, image_reg, image_tag, cve_loc = [ advanced.get(field) for field in ('adv_http', 'adv_https', 'no_proxy', 'image_repo', 'image_reg', 'image_tag', 'cve_loc') ] return cls.appliance.collections.containers_providers.instantiate( prov_class=cls, name=prov_config.get('name'), key=prov_key, zone=prov_config.get('server_zone'), metrics_type=prov_config.get('metrics_type'), alerts_type=prov_config.get('alerts_type'), endpoints=endpoints, provider_data=prov_config, http_proxy=http_proxy, adv_http=adv_http, adv_https=adv_https, no_proxy=no_proxy, image_repo=image_repo, image_reg=image_reg, image_tag=image_tag, cve_loc=cve_loc, virt_type=prov_config.get('virt_type'))
[docs] def custom_attributes(self): """returns custom attributes""" response = self.appliance.rest_api.get( path.join(self.href(), 'custom_attributes')) out = [] for attr_dict in response['resources']: attr = self.appliance.rest_api.get(attr_dict['href']) out.append( CustomAttribute( attr['name'], attr['value'], (attr['field_type'] if 'field_type' in attr else None), attr_dict['href'] ) ) return out
[docs] def add_custom_attributes(self, *custom_attributes): """Adding static custom attributes to provider. Args: custom_attributes: The custom attributes to add. returns: response. """ if not custom_attributes: raise TypeError('{} takes at least 1 argument.' .format(self.add_custom_attributes.__name__)) for c_attr in custom_attributes: if not isinstance(c_attr, CustomAttribute): raise TypeError('All arguments should be of type {}. ({} != {})' .format(CustomAttribute, type(c_attr), CustomAttribute)) payload = { "action": "add", "resources": [{ "name": ca.name, "value": str(ca.value) } for ca in custom_attributes]} for i, fld_tp in enumerate([c_attr.field_type for c_attr in custom_attributes]): if fld_tp: payload['resources'][i]['field_type'] = fld_tp return self.appliance.rest_api.post( path.join(self.href(), 'custom_attributes'), **payload)
[docs] def edit_custom_attributes(self, *custom_attributes): """Editing static custom attributes in provider. Args: custom_attributes: The custom attributes to edit. returns: response. """ if not custom_attributes: raise TypeError('{} takes at least 1 argument.' .format(self.edit_custom_attributes.__name__)) for c_attr in custom_attributes: if not isinstance(c_attr, CustomAttribute): raise TypeError('All arguments should be of type {}. ({} != {})' .format(CustomAttribute, type(c_attr), CustomAttribute)) attribs = self.custom_attributes() payload = { "action": "edit", "resources": [{ "href": filter(lambda c_attr: c_attr.name == ca.name, attribs)[-1].href, "value": ca.value } for ca in custom_attributes]} return self.appliance.rest_api.post( path.join(self.href(), 'custom_attributes'), **payload)
[docs] def delete_custom_attributes(self, *custom_attributes): """Deleting static custom attributes from provider. Args: custom_attributes: The custom attributes to delete. (Could be also names (str)) Returns: response. """ names = [] for c_attr in custom_attributes: attr_type = type(c_attr) if attr_type in (str, CustomAttribute): names.append(c_attr if attr_type is str else c_attr.name) else: raise TypeError('Type of arguments should be either' 'str or CustomAttribute. ({} not in [str, CustomAttribute])' .format(type(c_attr))) attribs = self.custom_attributes() if not names: names = [attrib.name for attrib in attribs] payload = { "action": "delete", "resources": [{ "href": attrib.href, } for attrib in attribs if attrib.name in names]} return self.appliance.rest_api.post( path.join(self.href(), 'custom_attributes'), **payload)
[docs] def sync_ssl_certificate(self): """ fixture which sync SSL certificate between CFME and OCP Args: provider (OpenShiftProvider): OCP system to sync cert from appliance (IPAppliance): CFME appliance to sync cert with Returns: None """ def _copy_certificate(): is_succeed = True try: # Copy certificate to the appliance provider_ssh.get_file("/etc/origin/master/ca.crt", "/tmp/ca.crt") appliance_ssh.put_file("/tmp/ca.crt", "/etc/pki/ca-trust/source/anchors/{crt}".format( crt=cert_name)) except URLError: logger.debug("Fail to deploy certificate from Openshift to CFME") is_succeed = False finally: return is_succeed provider_ssh = self.cli.ssh_client appliance_ssh = self.appliance.ssh_client() # Connection to the applince in case of dead connection if not appliance_ssh.connected: appliance_ssh.connect() # Checking if SSL is already configured between appliance and provider, # by send a HTTPS request (using SSL) from the appliance to the provider, # hiding the output and sending back the return code of the action _, stdout, stderr = \ appliance_ssh.exec_command( "curl https://{provider}:8443 -sS > /dev/null;echo $?".format( provider=self.provider_data.hostname)) # Do in case of failure (return code is not 0) if stdout.readline().replace('\n', "") != "0": cert_name = "{provider_name}.ca.crt".format( provider_name=self.provider_data.hostname.split(".")[0]) wait_for(_copy_certificate, num_sec=600, delay=30, message="Copy certificate from OCP to CFME") appliance_ssh.exec_command("update-ca-trust") # restarting evemserverd to apply the new SSL certificate self.appliance.restart_evm_service() self.appliance.wait_for_evm_service() self.appliance.wait_for_web_ui()
[docs] def get_system_id(self): mgmt_systems_tbl = self.appliance.db.client['ext_management_systems'] return self.appliance.db.client.session.query(mgmt_systems_tbl).filter( mgmt_systems_tbl.name == self.name).first().id
[docs] def get_metrics(self, **kwargs): """"Returns all the collected metrics for this provider Args: filters: list of dicts with column name and values e.g [{"resource_type": "Container"}, {"parent_ems_id": "1L"}] metrics_table: Metrics table name, there are few metrics table e.g metrics, metric_rollups, etc Returns: Query object with the relevant records """ filters = kwargs.get("filters", {}) metrics_table = kwargs.get("metrics_table", "metric_rollups") metrics_tbl = self.appliance.db.client[metrics_table] mgmt_system_id = self.get_system_id() logger.info("Getting metrics for {name} (parent_ems_id == {id})".format( name=self.name, id=mgmt_system_id)) if filters: logger.info("Filtering by: {f}".format(f=filters)) filters["parent_ems_id"] = mgmt_system_id return self.appliance.db.client.session.query(metrics_tbl).filter_by(**filters)
[docs] def wait_for_collected_metrics(self, timeout="50m", table_name="metrics"): """Check the db if gathering collection data Args: timeout: timeout in minutes Return: Bool: is collected metrics count is greater than 0 """ def is_collected(): metrics_count = self.get_metrics(table=table_name).count() logger.info("Current metrics found count is {count}".format(count=metrics_count)) return metrics_count > 0 logger.info("Monitoring DB for metrics collection") result = True try: wait_for(is_collected, timeout=timeout, delay=30) except TimedOutError: logger.error( "Timeout exceeded, No metrics found in MIQ DB for the provider \"{name}\"".format( name=self.name)) result = False finally: return result