Source code for clipper_admin.kubernetes.kubernetes_container_manager

from __future__ import absolute_import, division, print_function
from ..container_manager import (
    create_model_container_label, ContainerManager, CLIPPER_DOCKER_LABEL,
    CLIPPER_MODEL_CONTAINER_LABEL, CLIPPER_QUERY_FRONTEND_ID_LABEL,
    CLIPPER_INTERNAL_MANAGEMENT_PORT, CLIPPER_INTERNAL_QUERY_PORT,
    CLIPPER_INTERNAL_METRIC_PORT)
from ..exceptions import ClipperException
from .kubernetes_metric_utils import start_prometheus, CLIPPER_FRONTEND_EXPORTER_IMAGE

from contextlib import contextmanager
from kubernetes import client, config
from kubernetes.client.rest import ApiException
from kubernetes.client import configuration, V1DeleteOptions
import logging
import json
import yaml
import os
import time

CLIPPER_QUERY_FRONTEND_DEPLOYMENT_LABEL = "ai.clipper.name=query-frontend"

logger = logging.getLogger(__name__)
cur_dir = os.path.dirname(os.path.abspath(__file__))


@contextmanager
def _pass_conflicts():
    try:
        yield
    except ApiException as e:
        body = json.loads(e.body)
        if body['reason'] == 'AlreadyExists':
            logger.info("{} already exists, skipping!".format(body['details']))
            pass
        else:
            raise e


[docs]class KubernetesContainerManager(ContainerManager):
[docs] def __init__(self, kubernetes_proxy_addr=None, redis_ip=None, redis_port=6379, useInternalIP=False, namespace='default', create_namespace_if_not_exists=False): """ Parameters ---------- kubernetes_proxy_addr : str, optional The proxy address if you are proxying connections locally using ``kubectl proxy``. If this argument is provided, Clipper will construct the appropriate proxy URLs for accessing Clipper's Kubernetes services, rather than using the API server addres provided in your kube config. redis_ip : str, optional The address of a running Redis cluster. If set to None, Clipper will start a Redis deployment for you. redis_port : int, optional The Redis port. If ``redis_ip`` is set to None, Clipper will start Redis on this port. If ``redis_ip`` is provided, Clipper will connect to Redis on this port. useInternalIP : bool, optional Use Internal IP of the K8S nodes . If ``useInternalIP`` is set to False, Clipper will throw an exception if none of the nodes have ExternalDNS. If ``useInternalIP`` is set to true, Clipper will use the Internal IP of the K8S node if no ExternalDNS exists for any of the nodes. namespace: str, optional The Kubernetes namespace to use . If this argument is provided, all Clipper artifacts and resources will be created in this k8s namespace. If not "default" namespace is used. create_namespace_if_not_exists: bool, False Create a k8s namespace if the namespace doesnt already exist. If this argument is provided and the k8s namespace does not exist a new k8s namespace will be created. Note ---- Clipper stores all persistent configuration state (such as registered application and model information) in Redis. If you want Clipper to be durable and able to recover from failures, we recommend configuring your own persistent and replicated Redis cluster rather than letting Clipper launch one for you. """ if kubernetes_proxy_addr is not None: self.kubernetes_proxy_addr = kubernetes_proxy_addr self.use_k8s_proxy = True else: self.use_k8s_proxy = False self.redis_ip = redis_ip self.redis_port = redis_port self.useInternalIP = useInternalIP config.load_kube_config() configuration.assert_hostname = False self._k8s_v1 = client.CoreV1Api() self._k8s_beta = client.ExtensionsV1beta1Api() # Check if namespace exists and if create flag set ...create the namespace or throw error namespaces = [] for ns in self._k8s_v1.list_namespace().items: namespaces.append(ns.metadata.name) if namespace in namespaces: self.k8s_namespace = namespace elif create_namespace_if_not_exists: body = client.V1Namespace() body.metadata = client.V1ObjectMeta(name=namespace) try: self._k8s_v1.create_namespace(body) except ApiException as e: logging.error( "Exception creating Kubernetes namespace: {}".format(e)) raise ClipperException( "Could not create Kubernetes namespace. " "Reason: {}".format(e.reason)) self.k8s_namespace = namespace else: msg = "Error connecting to Kubernetes cluster. Namespace does not exist" logger.error(msg) raise ClipperException(msg)
def start_clipper(self, query_frontend_image, mgmt_frontend_image, cache_size, num_frontend_replicas=1): self.num_frontend_replicas = num_frontend_replicas # If an existing Redis service isn't provided, start one if self.redis_ip is None: name = 'redis' with _pass_conflicts(): self._k8s_beta.create_namespaced_deployment( body=yaml.load( open( os.path.join(cur_dir, '{}-deployment.yaml'.format(name)))), namespace=self.k8s_namespace) with _pass_conflicts(): body = yaml.load( open( os.path.join(cur_dir, '{}-service.yaml'.format(name)))) body["spec"]["ports"][0]["port"] = self.redis_port self._k8s_v1.create_namespaced_service( body=body, namespace=self.k8s_namespace) time.sleep(10) for name, img in zip(['mgmt-frontend', 'query-frontend'], [mgmt_frontend_image, query_frontend_image]): with _pass_conflicts(): body = yaml.load( open( os.path.join(cur_dir, '{}-deployment.yaml'.format(name)))) if self.redis_ip is not None: args = [ "--redis_ip={}".format(self.redis_ip), "--redis_port={}".format(self.redis_port) ] if name is 'query-frontend': args.append( "--prediction_cache_size={}".format(cache_size)) body["spec"]["template"]["spec"]["containers"][0][ "args"] = args body["spec"]["template"]["spec"]["containers"][0][ "image"] = img if name is 'query-frontend': # Create multiple query frontend for query_frontend_id in range(num_frontend_replicas): # Create single query frontend depolyment body['metadata']['name'] = 'query-frontend-{}'.format( query_frontend_id) body['spec']['template']['metadata']['labels'][ CLIPPER_QUERY_FRONTEND_ID_LABEL] = str( query_frontend_id) body['spec']['template']['spec']['containers'][1][ 'image'] = CLIPPER_FRONTEND_EXPORTER_IMAGE self._k8s_beta.create_namespaced_deployment( body=body, namespace=self.k8s_namespace) # Create single query frontend rpc service # Don't confuse this with query frontend service, which # is created after the loop as a single user facing # service. rpc_service_body = yaml.load( open( os.path.join( cur_dir, '{}-rpc-service.yaml'.format(name)))) rpc_service_body['metadata'][ 'name'] = 'query-frontend-{}'.format( query_frontend_id) rpc_service_body['spec']['selector'][ CLIPPER_QUERY_FRONTEND_ID_LABEL] = str( query_frontend_id) self._k8s_v1.create_namespaced_service( body=rpc_service_body, namespace=self.k8s_namespace) else: self._k8s_beta.create_namespaced_deployment( body=body, namespace=self.k8s_namespace) with _pass_conflicts(): body = yaml.load( open( os.path.join(cur_dir, '{}-service.yaml'.format(name)))) self._k8s_v1.create_namespaced_service( body=body, namespace=self.k8s_namespace) start_prometheus(self._k8s_v1, self._k8s_beta, self.k8s_namespace) self.connect() def connect(self): nodes = self._k8s_v1.list_node() external_node_hosts = [] for node in nodes.items: for addr in node.status.addresses: if addr.type == "ExternalDNS": external_node_hosts.append(addr.address) if len(external_node_hosts) == 0 and (self.useInternalIP): msg = "No external node addresses found. Using Internal IP address" logger.warn(msg) for addr in node.status.addresses: if addr.type == "InternalIP": external_node_hosts.append(addr.address) if len(external_node_hosts) == 0: msg = "Error connecting to Kubernetes cluster. No external node addresses found" logger.error(msg) raise ClipperException(msg) self.external_node_hosts = external_node_hosts logger.info("Found {num_nodes} nodes: {nodes}".format( num_nodes=len(external_node_hosts), nodes=", ".join(external_node_hosts))) try: mgmt_frontend_ports = self._k8s_v1.read_namespaced_service( name="mgmt-frontend", namespace=self.k8s_namespace).spec.ports for p in mgmt_frontend_ports: if p.name == "1338": self.clipper_management_port = p.node_port logger.info("Setting Clipper mgmt port to {}".format( self.clipper_management_port)) query_frontend_ports = self._k8s_v1.read_namespaced_service( name="query-frontend", namespace=self.k8s_namespace).spec.ports for p in query_frontend_ports: if p.name == "1337": self.clipper_query_port = p.node_port logger.info("Setting Clipper query port to {}".format( self.clipper_query_port)) elif p.name == "7000": self.clipper_rpc_port = p.node_port query_frontend_deployments = self._k8s_beta.list_namespaced_deployment( namespace=self.k8s_namespace, label_selector=CLIPPER_QUERY_FRONTEND_DEPLOYMENT_LABEL).items self.num_frontend_replicas = len(query_frontend_deployments) metrics_ports = self._k8s_v1.read_namespaced_service( name="metrics", namespace=self.k8s_namespace).spec.ports for p in metrics_ports: if p.name == "9090": self.clipper_metric_port = p.node_port logger.info("Setting Clipper metric port to {}".format( self.clipper_metric_port)) except ApiException as e: logging.warn( "Exception connecting to Clipper Kubernetes cluster: {}". format(e)) raise ClipperException( "Could not connect to Clipper Kubernetes cluster. " "Reason: {}".format(e)) def deploy_model(self, name, version, input_type, image, num_replicas=1): with _pass_conflicts(): for query_frontend_id in range(self.num_frontend_replicas): deployment_name = get_model_deployment_name( name, version, query_frontend_id) body = { 'apiVersion': 'extensions/v1beta1', 'kind': 'Deployment', 'metadata': { "name": deployment_name, "label": { "test": "readiness" }, }, 'spec': { 'replicas': num_replicas, 'template': { 'metadata': { 'labels': { CLIPPER_MODEL_CONTAINER_LABEL: create_model_container_label( name, version), CLIPPER_DOCKER_LABEL: "" }, 'annotations': { "prometheus.io/scrape": "true", "prometheus.io/port": "1390", "test": "readiness", } }, 'spec': { 'containers': [{ 'name': deployment_name, 'image': image, 'imagePullPolicy': 'Always', 'readinessProbe': { 'exec': { 'command': ['cat', '/model_is_ready.check'] }, 'initialDelaySeconds': 3, 'periodSeconds': 3 }, 'ports': [{ 'containerPort': 80 }, { 'containerPort': 1390 }], 'env': [{ 'name': 'CLIPPER_MODEL_NAME', 'value': name }, { 'name': 'CLIPPER_MODEL_VERSION', 'value': str(version) }, { 'name': 'CLIPPER_IP', 'value': 'query-frontend-{}'.format( query_frontend_id) }, { 'name': 'CLIPPER_INPUT_TYPE', 'value': input_type }] }] } } } } self._k8s_beta.create_namespaced_deployment( body=body, namespace=self.k8s_namespace) while self._k8s_beta.read_namespaced_deployment_status( name=deployment_name, namespace=self.k8s_namespace).status.available_replicas \ != num_replicas: time.sleep(3) def get_num_replicas(self, name, version): deployment_name = get_model_deployment_name( name, version, query_frontend_id=0) response = self._k8s_beta.read_namespaced_deployment_scale( name=deployment_name, namespace=self.k8s_namespace) return response.spec.replicas def set_num_replicas(self, name, version, input_type, image, num_replicas): # NOTE: assumes `metadata.name` can identify the model deployment. for query_frontend_id in range(self.num_frontend_replicas): deployment_name = get_model_deployment_name( name, version, query_frontend_id) self._k8s_beta.patch_namespaced_deployment_scale( name=deployment_name, namespace=self.k8s_namespace, body={ 'spec': { 'replicas': num_replicas, } }) while self._k8s_beta.read_namespaced_deployment_status( name=deployment_name, namespace=self.k8s_namespace).status.available_replicas \ != num_replicas: time.sleep(3)
[docs] def get_logs(self, logging_dir): logging_dir = os.path.abspath(os.path.expanduser(logging_dir)) log_files = [] if not os.path.exists(logging_dir): os.makedirs(logging_dir) logger.info("Created logging directory: %s" % logging_dir) for pod in self._k8s_v1.list_namespaced_pod( namespace=self.k8s_namespace, label_selector=CLIPPER_DOCKER_LABEL).items: for i, c in enumerate(pod.status.container_statuses): # log_file_name = "image_{image}:container_{id}.log".format( # image=c.image_id, id=c.container_id) log_file_name = "{pod}_{num}.log".format( pod=pod.metadata.name, num=str(i)) log_file_alt = "{cname}.log".format(cname=c.name) logger.info("log file name: {}".format(log_file_name)) logger.info("log alt file name: {}".format(log_file_alt)) log_file = os.path.join(logging_dir, log_file_name) with open(log_file, "w") as lf: lf.write( self._k8s_v1.read_namespaced_pod_log( namespace=self.k8s_namespace, name=pod.metadata.name, container=c.name)) log_files.append(log_file) return log_files
[docs] def stop_models(self, models): # Stops all deployments of pods running Clipper models with the specified # names and versions. try: for m in models: for v in models[m]: self._k8s_beta.delete_collection_namespaced_deployment( namespace=self.k8s_namespace, label_selector="{label}:{val}".format( label=CLIPPER_MODEL_CONTAINER_LABEL, val=create_model_container_label(m, v))) except ApiException as e: logger.warn( "Exception deleting kubernetes deployments: {}".format(e)) raise e
def stop_all_model_containers(self): try: self._k8s_beta.delete_collection_namespaced_deployment( namespace=self.k8s_namespace, label_selector=CLIPPER_MODEL_CONTAINER_LABEL) except ApiException as e: logger.warn( "Exception deleting kubernetes deployments: {}".format(e)) raise e
[docs] def stop_all(self): logger.info("Stopping all running Clipper resources") try: for service in self._k8s_v1.list_namespaced_service( namespace=self.k8s_namespace, label_selector=CLIPPER_DOCKER_LABEL).items: service_name = service.metadata.name self._k8s_v1.delete_namespaced_service( namespace=self.k8s_namespace, name=service_name, body=V1DeleteOptions()) self._k8s_beta.delete_collection_namespaced_deployment( namespace=self.k8s_namespace, label_selector=CLIPPER_DOCKER_LABEL) self._k8s_beta.delete_collection_namespaced_replica_set( namespace=self.k8s_namespace, label_selector=CLIPPER_DOCKER_LABEL) self._k8s_v1.delete_collection_namespaced_replication_controller( namespace=self.k8s_namespace, label_selector=CLIPPER_DOCKER_LABEL) self._k8s_v1.delete_collection_namespaced_pod( namespace=self.k8s_namespace, label_selector=CLIPPER_DOCKER_LABEL) self._k8s_v1.delete_collection_namespaced_pod( namespace=self.k8s_namespace, label_selector=CLIPPER_MODEL_CONTAINER_LABEL) self._k8s_v1.delete_collection_namespaced_config_map( namespace=self.k8s_namespace, label_selector=CLIPPER_DOCKER_LABEL) except ApiException as e: logging.warn( "Exception deleting kubernetes resources: {}".format(e))
def get_registry(self): return self.registry def get_admin_addr(self): if self.use_k8s_proxy: return ("{proxy_addr}/api/v1/namespaces/{ns}/" "services/mgmt-frontend:{port}/proxy").format( proxy_addr=self.kubernetes_proxy_addr, ns=self.k8s_namespace, port=CLIPPER_INTERNAL_MANAGEMENT_PORT) else: return "{host}:{port}".format( host=self.external_node_hosts[0], port=self.clipper_management_port) def get_query_addr(self): if self.use_k8s_proxy: return ("{proxy_addr}/api/v1/namespaces/{ns}/" "services/query-frontend:{port}/proxy").format( proxy_addr=self.kubernetes_proxy_addr, ns=self.k8s_namespace, port=CLIPPER_INTERNAL_QUERY_PORT) else: return "{host}:{port}".format( host=self.external_node_hosts[0], port=self.clipper_query_port) def get_metric_addr(self): if self.use_k8s_proxy: return ("{proxy_addr}/api/v1/namespaces/{ns}/" "services/metrics:{port}/proxy").format( proxy_addr=self.kubernetes_proxy_addr, ns=self.k8s_namespace, port=CLIPPER_INTERNAL_METRIC_PORT) else: return "{host}:{port}".format( host=self.external_node_hosts[0], port=self.clipper_metric_port)
def get_model_deployment_name(name, version, query_frontend_id): return "{name}-{version}-deployment-at-{query_frontend_id}".format( name=name, version=version, query_frontend_id=query_frontend_id)