Source code for cfme.containers.provider.openshift

from cached_property import cached_property
from os import path

from wrapanapi.containers.providers.openshift import Openshift

from . import ContainersProvider

from cfme.containers.provider import (
    ContainersProviderDefaultEndpoint, ContainersProviderEndpointsForm
)
from cfme.common.provider import DefaultEndpoint
from cfme.utils.ocp_cli import OcpCli
from cfme.utils.path import data_path
from cfme.utils.varmeth import variable
from cfme.utils import version


[docs]class CustomAttribute(object): def __init__(self, name, value, field_type=None, href=None): self.name = name self.value = value self.field_type = field_type self.href = href
[docs]class OpenshiftDefaultEndpoint(ContainersProviderDefaultEndpoint): """Represents Openshift default endpoint""" @staticmethod
[docs] def get_ca_cert(): """Getting OpenShift's certificate from the master machine. Args: No args. returns: certificate's content. """ cert_file_path = path.join(str(data_path), 'cert-auths', 'cmqe-tests-openshift-signer.crt') with open(cert_file_path) as f: return f.read()
[docs]class HawkularEndpoint(DefaultEndpoint): """Represents Hawkular Endpoint""" name = 'hawkular' @property def view_value_mapping(self): out = { 'hostname': self.hostname, 'api_port': self.api_port, 'sec_protocol': self.sec_protocol } if out['sec_protocol'] and self.sec_protocol.lower() == 'ssl trusting custom ca': out['trusted_ca_certificates'] = OpenshiftDefaultEndpoint.get_ca_cert() return out
[docs]class AlertsEndpoint(DefaultEndpoint): """Represents Alerts Endpoint""" name = 'alerts' @property def view_value_mapping(self): out = {} out['hostname'] = version.pick({ version.LOWEST: None, '5.9': self.hostname}) out['api_port'] = version.pick({ version.LOWEST: None, '5.9': self.api_port}) out['sec_protocol'] = version.pick({ version.LOWEST: None, '5.9': self.sec_protocol}) if out['sec_protocol'] and self.sec_protocol.lower() == 'ssl trusting custom ca': out['trusted_ca_certificates'] = OpenshiftDefaultEndpoint.get_ca_cert() return out
[docs]class OpenshiftProvider(ContainersProvider): num_route = ['num_route'] STATS_TO_MATCH = ContainersProvider.STATS_TO_MATCH + num_route type_name = "openshift" mgmt_class = Openshift db_types = ["Openshift::ContainerManager"] endpoints_form = ContainersProviderEndpointsForm def __init__( self, name=None, key=None, zone=None, metrics_type=None, alerts_type=None, provider_data=None, endpoints=None, appliance=None, http_proxy=None, adv_http=None, adv_https=None, no_proxy=None, image_repo=None, image_reg=None, image_tag=None, cve_loc=None): self.http_proxy = http_proxy self.adv_http = adv_http self.adv_https = adv_https self.no_proxy = no_proxy self.image_repo = image_repo self.image_reg = image_reg self.image_tag = image_tag self.cve_loc = cve_loc super(OpenshiftProvider, self).__init__( name=name, key=key, zone=zone, metrics_type=metrics_type, provider_data=provider_data, alerts_type=alerts_type, endpoints=endpoints, appliance=appliance) @cached_property def cli(self): return OcpCli(self)
[docs] def href(self): return self.appliance.rest_api.collections.providers\ .find_by(name=self.name).resources[0].href
@property def view_value_mapping(self): mapping = { 'name': self.name, 'zone': self.zone } mapping['prov_type'] = ( 'OpenShift Container Platform' if self.appliance.is_downstream else 'OpenShift') if self.appliance.version >= '5.9': mapping['metrics_type'] = self.metrics_type mapping['alerts_type'] = self.alerts_type mapping['proxy'] = { 'http_proxy': self.http_proxy } mapping['advanced'] = { 'adv_http': self.adv_http, 'adv_https': self.adv_https, 'no_proxy': self.no_proxy, 'image_repo': self.image_repo, 'image_reg': self.image_reg, 'image_tag': self.image_tag, 'cve_loc': self.cve_loc } else: mapping['metrics_type'] = None mapping['alerts_type'] = None mapping['proxy'] = None mapping['advanced'] = None return mapping @variable(alias='db') def num_route(self): return self._num_db_generic('container_routes') @num_route.variant('ui') def num_route_ui(self): return int(self.get_detail("Relationships", "Routes")) @variable(alias='db') def num_template(self): return self._num_db_generic('container_templates') @num_template.variant('ui') def num_template_ui(self): return int(self.get_detail("Relationships", "Container Templates")) @classmethod
[docs] def from_config(cls, prov_config, prov_key, appliance=None): endpoints = {} token_creds = cls.process_credential_yaml_key(prov_config['credentials'], cred_type='token') for endp in prov_config['endpoints']: if OpenshiftDefaultEndpoint.name == endp: prov_config['endpoints'][endp]['token'] = token_creds.token endpoints[endp] = OpenshiftDefaultEndpoint(**prov_config['endpoints'][endp]) elif HawkularEndpoint.name == endp: endpoints[endp] = HawkularEndpoint(**prov_config['endpoints'][endp]) elif AlertsEndpoint.name == endp: endpoints[endp] = AlertsEndpoint(**prov_config['endpoints'][endp]) # TODO Add Prometheus and logic for having to select or the other based on metrcis_type else: raise Exception('Unsupported endpoint type "{}".'.format(endp)) settings = prov_config.get('settings', {}) advanced = settings.get('advanced', {}) http_proxy = settings.get('proxy', {}).get('http_proxy') adv_http, adv_https, no_proxy, image_repo, image_reg, image_tag, cve_loc = [ advanced.get(field) for field in ('adv_http', 'adv_https', 'no_proxy', 'image_repo', 'image_reg', 'image_tag', 'cve_loc') ] return cls( name=prov_config.get('name'), key=prov_key, zone=prov_config.get('server_zone'), metrics_type=prov_config.get('metrics_type'), alerts_type=prov_config.get('alerts_type'), endpoints=endpoints, provider_data=prov_config, appliance=appliance, http_proxy=http_proxy, adv_http=adv_http, adv_https=adv_https, no_proxy=no_proxy, image_repo=image_repo, image_reg=image_reg, image_tag=image_tag, cve_loc=cve_loc )
[docs] def custom_attributes(self): """returns custom attributes""" response = self.appliance.rest_api.get( path.join(self.href(), 'custom_attributes')) out = [] for attr_dict in response['resources']: attr = self.appliance.rest_api.get(attr_dict['href']) out.append( CustomAttribute( attr['name'], attr['value'], (attr['field_type'] if 'field_type' in attr else None), attr_dict['href'] ) ) return out
[docs] def add_custom_attributes(self, *custom_attributes): """Adding static custom attributes to provider. Args: custom_attributes: The custom attributes to add. returns: response. """ if not custom_attributes: raise TypeError('{} takes at least 1 argument.' .format(self.add_custom_attributes.__name__)) for attr in custom_attributes: if not isinstance(attr, CustomAttribute): raise TypeError('All arguments should be of type {}. ({} != {})' .format(CustomAttribute, type(attr), CustomAttribute)) payload = { "action": "add", "resources": [{ "name": ca.name, "value": str(ca.value) } for ca in custom_attributes]} for i, fld_tp in enumerate([attr.field_type for attr in custom_attributes]): if fld_tp: payload['resources'][i]['field_type'] = fld_tp return self.appliance.rest_api.post( path.join(self.href(), 'custom_attributes'), **payload)
[docs] def edit_custom_attributes(self, *custom_attributes): """Editing static custom attributes in provider. Args: custom_attributes: The custom attributes to edit. returns: response. """ if not custom_attributes: raise TypeError('{} takes at least 1 argument.' .format(self.edit_custom_attributes.__name__)) for attr in custom_attributes: if not isinstance(attr, CustomAttribute): raise TypeError('All arguments should be of type {}. ({} != {})' .format(CustomAttribute, type(attr), CustomAttribute)) attribs = self.custom_attributes() payload = { "action": "edit", "resources": [{ "href": filter(lambda attr: attr.name == ca.name, attribs)[-1].href, "value": ca.value } for ca in custom_attributes]} return self.appliance.rest_api.post( path.join(self.href(), 'custom_attributes'), **payload)
[docs] def delete_custom_attributes(self, *custom_attributes): """Deleting static custom attributes from provider. Args: custom_attributes: The custom attributes to delete. (Could be also names (str)) Returns: response. """ names = [] for attr in custom_attributes: attr_type = type(attr) if attr_type in (str, CustomAttribute): names.append(attr if attr_type is str else attr.name) else: raise TypeError('Type of arguments should be either' 'str or CustomAttribute. ({} not in [str, CustomAttribute])' .format(type(attr))) attribs = self.custom_attributes() if not names: names = [attr.name for attr in attribs] payload = { "action": "delete", "resources": [{ "href": attr.href, } for attr in attribs if attr.name in names]} return self.appliance.rest_api.post( path.join(self.href(), 'custom_attributes'), **payload)