from __future__ import absolute_import, division, print_function
from ..container_manager import (
create_model_container_label, ContainerManager, CLIPPER_DOCKER_LABEL,
CLIPPER_MODEL_CONTAINER_LABEL, CLIPPER_INTERNAL_RPC_PORT,
CLIPPER_INTERNAL_MANAGEMENT_PORT, CLIPPER_INTERNAL_QUERY_PORT,
CLIPPER_INTERNAL_METRIC_PORT, CLIPPER_NAME_LABEL, ClusterAdapter)
from ..exceptions import ClipperException
from .kubernetes_metric_utils import PROM_VERSION
from contextlib import contextmanager
from kubernetes import client, config
from kubernetes.client.rest import ApiException
from kubernetes.client import configuration, V1DeleteOptions
import logging
import json
import yaml
import os
import time
import jinja2
from jinja2.exceptions import TemplateNotFound
logger = logging.getLogger(__name__)
cur_dir = os.path.dirname(os.path.abspath(__file__))
CLUSTER_IP = 'ClusterIP'
NODE_PORT = 'NodePort'
LOAD_BALANCER = 'LoadBalancer'
EXTERNAL_NAME = 'ExternalName'
OFFICIAL_K8S_SERVICE_TYPE = [CLUSTER_IP, NODE_PORT, LOAD_BALANCER, EXTERNAL_NAME]
DEFAULT_CLIPPER_SERVICE_TYPES = {
'redis': NODE_PORT,
'management': NODE_PORT,
'query': NODE_PORT,
'query-rpc': NODE_PORT,
'metric': NODE_PORT
}
DUMMY_CLUSTER_NAME = 'cluster-name'
CONFIG_FILES = {
'k8s': {
'service_types': '{cluster_name}-k8s-service-types.yaml'.format(
cluster_name=DUMMY_CLUSTER_NAME)
},
'redis': {
'service': 'redis-service.yaml',
'deployment': 'redis-deployment.yaml'
},
'management': {
'service': 'mgmt-frontend-service.yaml',
'deployment': 'mgmt-frontend-deployment.yaml'
},
'query': {
'service': {
'query': 'query-frontend-service.yaml',
'rpc': 'query-frontend-rpc-service.yaml'
},
'deployment': 'query-frontend-deployment.yaml',
},
'metric': {
'service': 'prom_service.yaml',
'deployment': 'prom_deployment.yaml',
'config': 'prom_configmap.yaml'
},
'rbac': {
'clusterrole': 'rbac_cluster_role.yaml',
'clusterrolebinding': 'rbac_cluster_role_binding.yaml',
},
'model': {
'deployment': 'model-container-template.yaml'
}
}
@contextmanager
def _pass_conflicts():
try:
yield
except ApiException as e:
body = json.loads(e.body)
if body['reason'] == 'AlreadyExists':
logger.info("{} already exists, skipping!".format(body['details']))
pass
else:
raise e
[docs]class KubernetesContainerManager(ContainerManager):
[docs] def __init__(self,
cluster_name="default-cluster",
kubernetes_proxy_addr=None,
redis_ip=None,
redis_port=6379,
useInternalIP=False,
namespace='default',
service_types=None,
create_namespace_if_not_exists=False):
"""
Parameters
----------
cluster_name : str
A unique name for this Clipper cluster. This can be used to run multiple Clipper
clusters on the same Kubernetes cluster without interfering with each other.
Kubernetes cluster name must follow Kubernetes label value naming rule, namely:
Valid label values must be 63 characters or less and must be empty or begin and end with
an alphanumeric character ([a-z0-9A-Z]) with dashes (-), underscores (_), dots (.),
and alphanumerics between. See more at:
https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set
kubernetes_proxy_addr : str, optional
The proxy address if you are proxying connections locally using ``kubectl proxy``.
If this argument is provided, Clipper will construct the appropriate proxy
URLs for accessing Clipper's Kubernetes services, rather than using the API server
addres provided in your kube config.
redis_ip : str, optional
The address of a running Redis cluster. If set to None, Clipper will start
a Redis deployment for you.
redis_port : int, optional
The Redis port. If ``redis_ip`` is set to None, Clipper will start Redis on this port.
If ``redis_ip`` is provided, Clipper will connect to Redis on this port.
useInternalIP : bool, optional
Use Internal IP of the K8S nodes . If ``useInternalIP`` is set to False, Clipper will
throw an exception if none of the nodes have ExternalDNS.
If ``useInternalIP`` is set to true, Clipper will use the Internal IP of the K8S node
if no ExternalDNS exists for any of the nodes.
namespace: str, optional
The Kubernetes namespace to use .
If this argument is provided, all Clipper artifacts and resources will be created in this
k8s namespace. If not "default" namespace is used.
service_types: dict, optional
Specify what kind of Kubernetes service you want.
You must use predefined 'ServiceTypes' in Kubernetes as value. See more at:
https://kubernetes.io/docs/concepts/services-networking/service/#publishing-services-service-types
For example, service_types = {
'redis': 'NodePort',
'management': 'LoadBalancer',
'query': 'LoadBalancer',
'query-rpc': 'ClusterIP',
'metric': 'LoadBalancer'
}
create_namespace_if_not_exists: bool, False
Create a k8s namespace if the namespace doesnt already exist.
If this argument is provided and the k8s namespace does not exist a new k8s namespace will
be created.
Note
----
Clipper stores all persistent configuration state (such as registered application and model
information) in Redis. If you want Clipper to be durable and able to recover from failures,
we recommend configuring your own persistent and replicated Redis cluster rather than
letting Clipper launch one for you.
"""
self.cluster_name = cluster_name
if kubernetes_proxy_addr is not None:
self.kubernetes_proxy_addr = kubernetes_proxy_addr
self.use_k8s_proxy = True
else:
self.use_k8s_proxy = False
self.redis_ip = redis_ip
self.redis_port = redis_port
self.useInternalIP = useInternalIP
config.load_kube_config()
configuration.assert_hostname = False
self._k8s_v1 = client.CoreV1Api()
self._k8s_beta = client.ExtensionsV1beta1Api()
self._k8s_rbac = client.RbacAuthorizationV1beta1Api()
# Create the template engine
# Config: Any variable missing -> Error
self.template_engine = jinja2.Environment(
loader=jinja2.FileSystemLoader(cur_dir, followlinks=True),
undefined=jinja2.StrictUndefined)
# Check if namespace exists and if create flag set ...create the namespace or throw error
namespaces = []
for ns in self._k8s_v1.list_namespace().items:
namespaces.append(ns.metadata.name)
if namespace in namespaces:
self.k8s_namespace = namespace
elif create_namespace_if_not_exists:
body = client.V1Namespace()
body.metadata = client.V1ObjectMeta(name=namespace)
try:
self._k8s_v1.create_namespace(body)
except ApiException as e:
logging.error(
"Exception creating Kubernetes namespace: {}".format(e))
raise ClipperException(
"Could not create Kubernetes namespace. "
"Reason: {}".format(e.reason))
self.k8s_namespace = namespace
else:
msg = "Error connecting to Kubernetes cluster. Namespace does not exist. You can pass in KubernetesContainerManager(create_namespace_if_not_exists=True) to crate this namespcae"
logger.error(msg)
raise ClipperException(msg)
# Initialize logger with cluster identifier
if self.k8s_namespace != "default":
self.cluster_identifier = "{cluster}".format(
cluster=self.cluster_name)
else:
self.cluster_identifier = "{ns}-{cluster}".format(
ns=self.k8s_namespace, cluster=self.cluster_name)
self.logger = ClusterAdapter(logger, {
'cluster_name': self.cluster_identifier
})
self.service_types = self._determine_service_types(service_types)
def _determine_service_types(self, st):
yaml_file_name = CONFIG_FILES['k8s']['service_types'].replace(
DUMMY_CLUSTER_NAME, self.cluster_identifier)
res = DEFAULT_CLIPPER_SERVICE_TYPES
if st is None:
try:
res = self._generate_config(yaml_file_name)
except TemplateNotFound:
res = DEFAULT_CLIPPER_SERVICE_TYPES
else:
if not isinstance(st, dict):
raise ClipperException(
"service_types must be 'dict' type: {}".format(st))
if set(st.keys()) != set(DEFAULT_CLIPPER_SERVICE_TYPES.keys()):
raise ClipperException(
"service_types has unknown keys: {}".format(st.keys()))
if any(v not in OFFICIAL_K8S_SERVICE_TYPE for v in set(st.values())):
raise ClipperException(
"service_types has unknown values: {}".format(st.values()))
if EXTERNAL_NAME in st.values():
raise ClipperException(
"Clipper does not support '{}' service".format(EXTERNAL_NAME))
res.update(st)
with open(os.path.join(cur_dir, yaml_file_name), 'w') as f:
yaml.dump(res, f)
logging.info("Your service_types are {}".format(res))
return res
def start_clipper(self,
query_frontend_image,
mgmt_frontend_image,
frontend_exporter_image,
cache_size,
qf_http_thread_pool_size,
qf_http_timeout_request,
qf_http_timeout_content,
num_frontend_replicas=1):
self._config_rbac()
self._start_redis()
self._start_mgmt(mgmt_frontend_image)
self.num_frontend_replicas = num_frontend_replicas
self._start_query(query_frontend_image, frontend_exporter_image,
cache_size, qf_http_thread_pool_size,
qf_http_timeout_request, qf_http_timeout_content,
num_frontend_replicas)
self._start_prometheus()
self.connect()
def _start_redis(self, sleep_time=5):
# If an existing Redis service isn't provided, start one
if self.redis_ip is None:
deployment_name = 'redis-at-{cluster_name}'.format(
cluster_name=self.cluster_name)
with _pass_conflicts():
self._k8s_beta.create_namespaced_deployment(
body=self._generate_config(
CONFIG_FILES['redis']['deployment'],
deployment_name=deployment_name,
cluster_name=self.cluster_name),
namespace=self.k8s_namespace)
with _pass_conflicts():
body = self._generate_config(
CONFIG_FILES['redis']['service'],
deployment_name=deployment_name,
public_redis_port=self.redis_port,
cluster_name=self.cluster_name,
service_type=self.service_types['redis'],
)
self._k8s_v1.create_namespaced_service(
body=body, namespace=self.k8s_namespace)
time.sleep(sleep_time)
# Wait for max 10 minutes
wait_count = 0
while self._k8s_beta.read_namespaced_deployment(
name=deployment_name,
namespace=self.k8s_namespace).status.available_replicas != 1:
time.sleep(3)
wait_count += 3
if wait_count > 600:
raise ClipperException(
"Could not create a Kubernetes deployment: {}".format(deployment_name))
self.redis_ip = deployment_name
def _start_mgmt(self, mgmt_image):
with _pass_conflicts():
mgmt_deployment_data = self._generate_config(
CONFIG_FILES['management']['deployment'],
image=mgmt_image,
redis_service_host=self.redis_ip,
redis_service_port=self.redis_port,
cluster_name=self.cluster_name)
self._k8s_beta.create_namespaced_deployment(
body=mgmt_deployment_data, namespace=self.k8s_namespace)
with _pass_conflicts():
mgmt_service_data = self._generate_config(
CONFIG_FILES['management']['service'],
cluster_name=self.cluster_name,
service_type=self.service_types['management'])
self._k8s_v1.create_namespaced_service(
body=mgmt_service_data, namespace=self.k8s_namespace)
def _start_query(self, query_image, frontend_exporter_image, cache_size,
qf_http_thread_pool_size, qf_http_timeout_request,
qf_http_timeout_content, num_replicas):
for query_frontend_id in range(num_replicas):
with _pass_conflicts():
query_deployment_data = self._generate_config(
CONFIG_FILES['query']['deployment'],
image=query_image,
exporter_image=frontend_exporter_image,
redis_service_host=self.redis_ip,
redis_service_port=self.redis_port,
cache_size=cache_size,
thread_pool_size=qf_http_thread_pool_size,
timeout_request=qf_http_timeout_request,
timeout_content=qf_http_timeout_content,
name='query-frontend-{}'.format(query_frontend_id),
id_label=str(query_frontend_id),
cluster_name=self.cluster_name)
self._k8s_beta.create_namespaced_deployment(
body=query_deployment_data, namespace=self.k8s_namespace)
with _pass_conflicts():
query_rpc_service_data = self._generate_config(
CONFIG_FILES['query']['service']['rpc'],
name='query-frontend-{}'.format(query_frontend_id),
id_label=str(query_frontend_id),
cluster_name=self.cluster_name,
service_type=self.service_types['query-rpc'],
)
self._k8s_v1.create_namespaced_service(
body=query_rpc_service_data, namespace=self.k8s_namespace)
with _pass_conflicts():
query_frontend_service_data = self._generate_config(
CONFIG_FILES['query']['service']['query'],
cluster_name=self.cluster_name,
service_type=self.service_types['query'])
self._k8s_v1.create_namespaced_service(
body=query_frontend_service_data, namespace=self.k8s_namespace)
def _start_prometheus(self):
with _pass_conflicts():
configmap_data = self._generate_config(
CONFIG_FILES['metric']['config'],
cluster_name=self.cluster_name)
self._k8s_v1.create_namespaced_config_map(
body=configmap_data, namespace=self.k8s_namespace)
with _pass_conflicts():
deployment_data = self._generate_config(
CONFIG_FILES['metric']['deployment'],
version=PROM_VERSION,
cluster_name=self.cluster_name,
)
self._k8s_beta.create_namespaced_deployment(
body=deployment_data, namespace=self.k8s_namespace)
with _pass_conflicts():
service_data = self._generate_config(
CONFIG_FILES['metric']['service'],
cluster_name=self.cluster_name,
service_type=self.service_types['metric'],
)
self._k8s_v1.create_namespaced_service(
body=service_data, namespace=self.k8s_namespace)
def _config_rbac(self):
with _pass_conflicts():
clusterrole_data = self._generate_config(
CONFIG_FILES['rbac']['clusterrole'],
cluster_name=self.cluster_name, namespace=self.k8s_namespace)
self._k8s_rbac.create_cluster_role(
body=clusterrole_data)
with _pass_conflicts():
clusterrolebinding_data = self._generate_config(
CONFIG_FILES['rbac']['clusterrolebinding'],
cluster_name=self.cluster_name, namespace=self.k8s_namespace)
self._k8s_rbac.create_cluster_role_binding(
body=clusterrolebinding_data)
def _generate_config(self, file_path, **kwargs):
template = self.template_engine.get_template(file_path)
rendered = template.render(**kwargs)
parsed = yaml.load(rendered, Loader=yaml.FullLoader)
return parsed
def connect(self):
if any(v in [CLUSTER_IP, NODE_PORT] for v in self.service_types.values()):
nodes = self._k8s_v1.list_node()
external_node_hosts = []
for node in nodes.items:
for addr in node.status.addresses:
if addr.type == "ExternalDNS":
external_node_hosts.append(addr.address)
if len(external_node_hosts) == 0 and self.useInternalIP:
msg = "No external node addresses found. Using Internal IP address"
self.logger.warning(msg)
for node in nodes.items:
for addr in node.status.addresses:
if addr.type == "InternalIP":
external_node_hosts.append(addr.address)
if len(external_node_hosts) == 0:
msg = "Error connecting to Kubernetes cluster. No external node addresses found. You may pass in KubernetesContainerManager(useInternalIP=True) to connect to local Kubernetes cluster"
self.logger.error(msg)
raise ClipperException(msg)
self.external_node_hosts = external_node_hosts
self.logger.info("Found {num_nodes} nodes: {nodes}".format(
num_nodes=len(external_node_hosts),
nodes=", ".join(external_node_hosts)))
try:
v1service = self._k8s_v1.read_namespaced_service(
name="mgmt-frontend-at-{cluster_name}".format(
cluster_name=self.cluster_name),
namespace=self.k8s_namespace)
mgmt_frontend_ports = v1service.spec.ports
for p in mgmt_frontend_ports:
if int(p.name) == CLIPPER_INTERNAL_MANAGEMENT_PORT:
self.clipper_management_port = p.node_port
if self.service_types['management'] in [CLUSTER_IP, NODE_PORT]:
self.logger.info("Setting Clipper mgmt port to {port}".format(
port=self.clipper_management_port))
elif self.service_types['management'] == LOAD_BALANCER:
self.clipper_management_ip = v1service.status.load_balancer.ingress[0].ip
self.logger.info("Setting Clipper mgmt port to {ip}:{port}"
.format(ip=self.clipper_management_ip,
port=self.clipper_management_port))
else:
msg = "Unknown service_type of management: {}".format(
self.service_types['management'])
raise ClipperException(msg)
v1service = self._k8s_v1.read_namespaced_service(
name="query-frontend-at-{cluster_name}".format(
cluster_name=self.cluster_name),
namespace=self.k8s_namespace)
query_frontend_ports = v1service.spec.ports
for p in query_frontend_ports:
if int(p.name) == CLIPPER_INTERNAL_QUERY_PORT:
self.clipper_query_port = p.node_port
elif int(p.name) == CLIPPER_INTERNAL_RPC_PORT:
self.clipper_rpc_port = p.node_port
if self.service_types['query'] in [CLUSTER_IP, NODE_PORT]:
self.logger.info("Setting Clipper query port to {}".format(
self.clipper_query_port))
elif self.service_types['query'] == LOAD_BALANCER:
self.clipper_query_ip = v1service.status.load_balancer.ingress[0].ip
self.logger.info("Setting Clipper query port to {ip}:{port}"
.format(ip=self.clipper_query_ip,
port=self.clipper_query_port))
else:
msg = "Unknown service_type of query: {}".format(
self.service_types['query'])
raise ClipperException(msg)
query_frontend_deployments = self._k8s_beta.list_namespaced_deployment(
namespace=self.k8s_namespace,
label_selector=
"{name_label}=query-frontend, {cluster_label}={cluster_name}".
format(
name_label=CLIPPER_NAME_LABEL,
cluster_label=CLIPPER_DOCKER_LABEL,
cluster_name=self.cluster_name)).items
self.num_frontend_replicas = len(query_frontend_deployments)
v1service = self._k8s_v1.read_namespaced_service(
name="metrics-at-{cluster_name}".format(
cluster_name=self.cluster_name),
namespace=self.k8s_namespace)
metrics_ports = v1service.spec.ports
for p in metrics_ports:
if p.name == "9090":
self.clipper_metric_port = p.node_port
if self.service_types['metric'] in [CLUSTER_IP, NODE_PORT]:
self.logger.info("Setting Clipper metric port to {port}".format(
port=self.clipper_metric_port))
elif self.service_types['metric'] == LOAD_BALANCER:
self.clipper_metric_ip = v1service.status.load_balancer.ingress[0].ip
self.logger.info("Setting Clipper metric port to {ip}:{port}"
.format(ip=self.clipper_metric_ip,
port=self.clipper_metric_port))
else:
msg = "Unknown service_type of metric: {}".format(
self.service_types['metric'])
raise ClipperException(msg)
except ApiException as e:
logging.warning(
"Exception connecting to Clipper Kubernetes cluster: {}".
format(e))
raise ClipperException(
"Could not connect to Clipper Kubernetes cluster. "
"Reason: {}".format(e))
def deploy_model(self, name, version, input_type, image, num_replicas=1):
for query_frontend_id in range(self.num_frontend_replicas):
deployment_name = get_model_deployment_name(
name, version, query_frontend_id, self.cluster_name)
generated_body = self._generate_config(
CONFIG_FILES['model']['deployment'],
deployment_name=deployment_name,
num_replicas=num_replicas,
container_label=create_model_container_label(name, version),
model_name=name,
version=version,
query_frontend_id=query_frontend_id,
input_type=input_type,
image=image,
cluster_name=self.cluster_name)
with _pass_conflicts():
self._k8s_beta.create_namespaced_deployment(
body=generated_body, namespace=self.k8s_namespace)
# Wait for max 10 minutes
wait_count = 0
while self._k8s_beta.read_namespaced_deployment(
name=deployment_name, namespace=self.k8s_namespace).status.available_replicas \
!= num_replicas:
time.sleep(3)
wait_count += 3
if wait_count > 600:
raise ClipperException(
"Could not create a Kubernetes deployment. "
"Model: {}-{} Image: {}".format(name, version, image))
def get_num_replicas(self, name, version):
deployment_name = get_model_deployment_name(
name, version, query_frontend_id=0, cluster_name=self.cluster_name)
response = self._k8s_beta.read_namespaced_deployment_scale(
name=deployment_name, namespace=self.k8s_namespace)
return response.spec.replicas
def set_num_replicas(self, name, version, input_type, image, num_replicas):
# NOTE: assumes `metadata.name` can identify the model deployment.
for query_frontend_id in range(self.num_frontend_replicas):
deployment_name = get_model_deployment_name(
name, version, query_frontend_id, self.cluster_name)
self._k8s_beta.patch_namespaced_deployment_scale(
name=deployment_name,
namespace=self.k8s_namespace,
body={
'spec': {
'replicas': num_replicas,
}
})
# Wait for max 10 minutes
wait_count = 0
while self._k8s_beta.read_namespaced_deployment(
name=deployment_name, namespace=self.k8s_namespace).status.available_replicas \
!= num_replicas:
time.sleep(3)
wait_count += 3
if wait_count > 600:
raise ClipperException(
"Could not update scale of the specified Deployment. "
"Model: {}-{} Image: {}".format(name, version, image))
[docs] def get_logs(self, logging_dir):
logging_dir = os.path.abspath(os.path.expanduser(logging_dir))
log_files = []
if not os.path.exists(logging_dir):
os.makedirs(logging_dir)
self.logger.info("Created logging directory: %s" % logging_dir)
for pod in self._k8s_v1.list_namespaced_pod(
namespace=self.k8s_namespace,
label_selector=CLIPPER_DOCKER_LABEL).items:
for i, c in enumerate(pod.status.container_statuses):
log_file_name = "{pod}_{num}.log".format(
pod=pod.metadata.name, num=str(i))
self.logger.info("log file name: {}".format(log_file_name))
log_file = os.path.join(logging_dir, log_file_name)
with open(log_file, "w") as lf:
lf.write(
self._k8s_v1.read_namespaced_pod_log(
namespace=self.k8s_namespace,
name=pod.metadata.name,
container=c.name))
log_files.append(log_file)
return log_files
[docs] def stop_models(self, models):
# Stops all deployments of pods running Clipper models with the specified
# names and versions.
try:
for m in models:
for v in models[m]:
label_selector="{label}={val}, {cluster_label}={cluster_name}".format(
label=CLIPPER_MODEL_CONTAINER_LABEL,
val=create_model_container_label(m, v),
cluster_label=CLIPPER_DOCKER_LABEL,
cluster_name=self.cluster_name)
self._k8s_beta.delete_collection_namespaced_deployment(
namespace=self.k8s_namespace,
label_selector=label_selector)
self._k8s_beta.delete_collection_namespaced_replica_set(
namespace=self.k8s_namespace,
label_selector=label_selector)
self._k8s_v1.delete_collection_namespaced_pod(
namespace=self.k8s_namespace,
label_selector=label_selector)
except ApiException as e:
self.logger.warning(
"Exception deleting kubernetes deployments: {}".format(e))
raise e
def stop_all_model_containers(self):
try:
label_selector="{label}, {cluster_label}={cluster_name}".format(
label=CLIPPER_MODEL_CONTAINER_LABEL,
cluster_label=CLIPPER_DOCKER_LABEL,
cluster_name=self.cluster_name)
self._k8s_beta.delete_collection_namespaced_deployment(
namespace=self.k8s_namespace,
label_selector=label_selector)
self._k8s_beta.delete_collection_namespaced_replica_set(
namespace=self.k8s_namespace,
label_selector=label_selector)
self._k8s_v1.delete_collection_namespaced_pod(
namespace=self.k8s_namespace,
label_selector=label_selector)
except ApiException as e:
self.logger.warning(
"Exception deleting kubernetes deployments: {}".format(e))
raise e
[docs] def stop_all(self, graceful=True):
self.logger.info("Stopping all running Clipper resources")
cluster_selector = "{cluster_label}={cluster_name}".format(
cluster_label=CLIPPER_DOCKER_LABEL, cluster_name=self.cluster_name)
try:
for service in self._k8s_v1.list_namespaced_service(
namespace=self.k8s_namespace,
label_selector=cluster_selector).items:
service_name = service.metadata.name
self._k8s_v1.delete_namespaced_service(
namespace=self.k8s_namespace,
name=service_name,
body=V1DeleteOptions())
self._k8s_beta.delete_collection_namespaced_deployment(
namespace=self.k8s_namespace, label_selector=cluster_selector)
self._k8s_beta.delete_collection_namespaced_replica_set(
namespace=self.k8s_namespace, label_selector=cluster_selector)
self._k8s_v1.delete_collection_namespaced_replication_controller(
namespace=self.k8s_namespace, label_selector=cluster_selector)
self._k8s_v1.delete_collection_namespaced_pod(
namespace=self.k8s_namespace, label_selector=cluster_selector)
self._k8s_v1.delete_collection_namespaced_config_map(
namespace=self.k8s_namespace, label_selector=cluster_selector)
self._k8s_rbac.delete_collection_cluster_role(
label_selector=cluster_selector)
self._k8s_rbac.delete_collection_cluster_role_binding(
label_selector=cluster_selector)
except ApiException as e:
logging.warning(
"Exception deleting kubernetes resources: {}".format(e))
try:
yaml_file_name = CONFIG_FILES['k8s']['service_types'].replace(
DUMMY_CLUSTER_NAME, self.cluster_identifier)
os.remove(os.path.join(cur_dir, yaml_file_name))
except OSError:
pass
def get_admin_addr(self):
if self.service_types['management'] == LOAD_BALANCER:
return "{host}:{port}".format(host=self.clipper_management_ip,
port=self.clipper_management_port)
if self.use_k8s_proxy:
return ("{proxy_addr}/api/v1/namespaces/{ns}/"
"services/mgmt-frontend-at-{cluster}:{port}/proxy").format(
proxy_addr=self.kubernetes_proxy_addr,
ns=self.k8s_namespace,
cluster=self.cluster_name,
port=CLIPPER_INTERNAL_MANAGEMENT_PORT)
else:
return "{host}:{port}".format(
host=self.external_node_hosts[0],
port=self.clipper_management_port)
def get_query_addr(self):
if self.service_types['query'] == LOAD_BALANCER:
return "{host}:{port}".format(host=self.clipper_query_ip,
port=self.clipper_query_port)
if self.use_k8s_proxy:
return (
"{proxy_addr}/api/v1/namespaces/{ns}/"
"services/query-frontend-at-{cluster}:{port}/proxy").format(
proxy_addr=self.kubernetes_proxy_addr,
ns=self.k8s_namespace,
cluster=self.cluster_name,
port=CLIPPER_INTERNAL_QUERY_PORT)
else:
return "{host}:{port}".format(
host=self.external_node_hosts[0], port=self.clipper_query_port)
def get_metric_addr(self):
if self.service_types['metric'] == LOAD_BALANCER:
return "{host}:{port}".format(host=self.clipper_metric_ip,
port=self.clipper_metric_port)
if self.use_k8s_proxy:
return ("{proxy_addr}/api/v1/namespaces/{ns}/"
"services/metrics-at-{cluster}:{port}/proxy").format(
proxy_addr=self.kubernetes_proxy_addr,
ns=self.k8s_namespace,
cluster=self.cluster_name,
port=CLIPPER_INTERNAL_METRIC_PORT)
else:
return "{host}:{port}".format(
host=self.external_node_hosts[0],
port=self.clipper_metric_port)
def get_model_deployment_name(name, version, query_frontend_id, cluster_name):
return "{name}-{version}-deployment-at-{query_frontend_id}-at-{cluster_name}".format(
name=name,
version=version,
query_frontend_id=query_frontend_id,
cluster_name=cluster_name)