Source code for clipper_admin.docker.docker_container_manager

from __future__ import absolute_import, division, print_function

import socket

import random
import docker
import docker.errors
import logging
import os
import tempfile
from ..container_manager import (
    create_model_container_label, parse_model_container_label,
    ContainerManager, CLIPPER_DOCKER_LABEL, CLIPPER_MODEL_CONTAINER_LABEL,
    CLIPPER_QUERY_FRONTEND_CONTAINER_LABEL,
    CLIPPER_MGMT_FRONTEND_CONTAINER_LABEL, CLIPPER_INTERNAL_RPC_PORT,
    CLIPPER_INTERNAL_MANAGEMENT_PORT, CLIPPER_INTERNAL_QUERY_PORT,
    CLIPPER_INTERNAL_METRIC_PORT, CLIPPER_INTERNAL_REDIS_PORT,
    CLIPPER_DOCKER_PORT_LABELS, CLIPPER_METRIC_CONFIG_LABEL, ClusterAdapter,
    CLIPPER_FLUENTD_CONFIG_LABEL, CLIPPER_INTERNAL_FLUENTD_PORT)
from ..exceptions import ClipperException
from .docker_metric_utils import (
    run_query_frontend_metric_image,
    setup_metric_config,
    run_metric_image,
    add_to_metric_config,
    delete_from_metric_config
)
from .logging.docker_logging_utils import (
    get_logs_from_containers,
    get_default_log_config
)
from .docker_api_utils import (
    create_network,
    check_container_status,
    list_containers,
    run_container
)
from clipper_admin.docker.logging.fluentd import Fluentd


logger = logging.getLogger(__name__)


[docs]class DockerContainerManager(ContainerManager):
[docs] def __init__(self, cluster_name="default-cluster", docker_ip_address="localhost", use_centralized_log=False, fluentd_port=CLIPPER_INTERNAL_FLUENTD_PORT, clipper_query_port=1337, clipper_management_port=1338, clipper_rpc_port=7000, redis_ip=None, redis_port=6379, prometheus_port=9090, docker_network="clipper_network", extra_container_kwargs=None): """ Parameters ---------- cluster_name : str A unique name for this Clipper cluster. This can be used to run multiple Clipper clusters on the same node without interfering with each other. docker_ip_address : str, optional The public hostname or IP address at which the Clipper Docker containers can be accessed via their exposed ports. This should almost always be "localhost". Only change if you know what you're doing! use_centralized_log: bool, optional If it is True, Clipper sets up Fluentd and DB (Currently SQlite) to centralize logs fluentd_port : int, optional The port on which the fluentd logging driver should listen to centralize logs. clipper_query_port : int, optional The port on which the query frontend should listen for incoming prediction requests. clipper_management_port : int, optional The port on which the management frontend should expose the management REST API. clipper_rpc_port : int, optional The port to start the Clipper RPC service on. redis_ip : str, optional The address of a running Redis cluster. If set to None, Clipper will start a Redis container for you. redis_port : int, optional The Redis port. If ``redis_ip`` is set to None, Clipper will start Redis on this port. If ``redis_ip`` is provided, Clipper will connect to Redis on this port. docker_network : str, optional The docker network to attach the containers to. You can read more about Docker networking in the `Docker User Guide <https://docs.docker.com/engine/userguide/networking/>`_. extra_container_kwargs : dict Any additional keyword arguments to pass to the call to :py:meth:`docker.client.containers.run`. """ self.cluster_name = cluster_name self.cluster_identifier = cluster_name # For logging purpose self.public_hostname = docker_ip_address self.clipper_query_port = clipper_query_port self.clipper_management_port = clipper_management_port self.clipper_rpc_port = clipper_rpc_port self.redis_ip = redis_ip if redis_ip is None: self.external_redis = False else: self.external_redis = True self.redis_port = redis_port self.prometheus_port = prometheus_port self.centralize_log = use_centralized_log if docker_network is "host": raise ClipperException( "DockerContainerManager does not support running Clipper on the " "\"host\" docker network. Please pick a different network name" ) self.docker_network = docker_network self.docker_client = docker.from_env() if extra_container_kwargs is None: self.extra_container_kwargs = {} else: self.extra_container_kwargs = extra_container_kwargs.copy() # Merge Clipper-specific labels with any user-provided labels if "labels" in self.extra_container_kwargs: self.common_labels = self.extra_container_kwargs.pop("labels") self.common_labels.update({ CLIPPER_DOCKER_LABEL: self.cluster_name }) else: self.common_labels = {CLIPPER_DOCKER_LABEL: self.cluster_name} container_args = { "network": self.docker_network, "detach": True, } self.extra_container_kwargs.update(container_args) self.logger = ClusterAdapter(logger, { 'cluster_name': self.cluster_identifier }) # Setting Docker cluster logging. self.logging_system = Fluentd self.log_config = get_default_log_config() self.logging_system_instance = None if self.centralize_log: self.logging_system_instance = self.logging_system( self.logger, self.cluster_name, self.docker_client, port=find_unbound_port(fluentd_port) ) self.log_config = self.logging_system_instance.get_log_config()
def start_clipper(self, query_frontend_image, mgmt_frontend_image, frontend_exporter_image, cache_size, qf_http_thread_pool_size, qf_http_timeout_request, qf_http_timeout_content, num_frontend_replicas=1): if num_frontend_replicas != 1: msg = "Docker container manager's query frontend scale-out " \ "hasn't been implemented. You can contribute to Clipper at " \ "https://github.com/ucbrise/clipper." \ "Please set num_frontend_replicas=1 or use Kubernetes." raise ClipperException(msg) create_network( docker_client=self.docker_client, name=self.docker_network) containers_in_cluster = list_containers( docker_client=self.docker_client, filters={ 'label': [ '{key}={val}'.format( key=CLIPPER_DOCKER_LABEL, val=self.cluster_name) ] }) if len(containers_in_cluster) > 0: raise ClipperException( "Cluster {} cannot be started because it already exists. " "Please use ClipperConnection.connect() to connect to it.". format(self.cluster_name)) if self.centralize_log: self.logging_system_instance.start(self.common_labels, self.extra_container_kwargs) # Redis for cluster configuration if not self.external_redis: self.logger.info("Starting managed Redis instance in Docker") redis_name = "redis-{}".format(random.randint(0, 100000)) self.redis_port = find_unbound_port(self.redis_port) redis_labels = self.common_labels.copy() redis_labels[CLIPPER_DOCKER_PORT_LABELS['redis']] = str( self.redis_port) redis_container = run_container( docker_client=self.docker_client, image='redis:alpine', cmd="redis-server --port %s" % CLIPPER_INTERNAL_REDIS_PORT, log_config=self.log_config, name=redis_name, ports={ '%s/tcp' % CLIPPER_INTERNAL_REDIS_PORT: self.redis_port }, labels=redis_labels, extra_container_kwargs=self.extra_container_kwargs) self.redis_ip = redis_container.name check_container_status( docker_client=self.docker_client, name=redis_name) # frontend management mgmt_cmd = "--redis_ip={redis_ip} --redis_port={redis_port}".format( redis_ip=self.redis_ip, redis_port=CLIPPER_INTERNAL_REDIS_PORT) mgmt_name = "mgmt_frontend-{}".format(random.randint(0, 100000)) self.clipper_management_port = find_unbound_port( self.clipper_management_port) mgmt_labels = self.common_labels.copy() mgmt_labels[CLIPPER_MGMT_FRONTEND_CONTAINER_LABEL] = "" mgmt_labels[CLIPPER_DOCKER_PORT_LABELS['management']] = str( self.clipper_management_port) run_container( docker_client=self.docker_client, image=mgmt_frontend_image, cmd=mgmt_cmd, log_config=self.log_config, name=mgmt_name, ports={ '%s/tcp' % CLIPPER_INTERNAL_MANAGEMENT_PORT: self.clipper_management_port }, labels=mgmt_labels, extra_container_kwargs=self.extra_container_kwargs) check_container_status( docker_client=self.docker_client, name=mgmt_name) # query frontend query_cmd = ("--redis_ip={redis_ip} --redis_port={redis_port} " "--prediction_cache_size={cache_size} " "--thread_pool_size={thread_pool_size} " "--timeout_request={timeout_request} " "--timeout_content={timeout_content}").format( redis_ip=self.redis_ip, redis_port=CLIPPER_INTERNAL_REDIS_PORT, cache_size=cache_size, thread_pool_size=qf_http_thread_pool_size, timeout_request=qf_http_timeout_request, timeout_content=qf_http_timeout_content) query_container_id = random.randint(0, 100000) query_name = "query_frontend-{}".format(query_container_id) self.clipper_query_port = find_unbound_port(self.clipper_query_port) self.clipper_rpc_port = find_unbound_port(self.clipper_rpc_port) query_labels = self.common_labels.copy() query_labels[CLIPPER_QUERY_FRONTEND_CONTAINER_LABEL] = "" query_labels[CLIPPER_DOCKER_PORT_LABELS['query_rest']] = str( self.clipper_query_port) query_labels[CLIPPER_DOCKER_PORT_LABELS['query_rpc']] = str( self.clipper_rpc_port) run_container( docker_client=self.docker_client, image=query_frontend_image, cmd=query_cmd, log_config=self.log_config, name=query_name, ports={ '%s/tcp' % CLIPPER_INTERNAL_QUERY_PORT: self.clipper_query_port, '%s/tcp' % CLIPPER_INTERNAL_RPC_PORT: self.clipper_rpc_port }, labels=query_labels, extra_container_kwargs=self.extra_container_kwargs) check_container_status( docker_client=self.docker_client, name=query_name) # Metric Section query_frontend_metric_name = "query_frontend_exporter-{}".format( query_container_id) run_query_frontend_metric_image( query_frontend_metric_name, self.docker_client, query_name, frontend_exporter_image, self.common_labels, self.log_config, self.extra_container_kwargs) check_container_status( docker_client=self.docker_client, name=query_frontend_metric_name) self.prom_config_path = tempfile.NamedTemporaryFile( 'w', suffix='.yml', delete=False).name self.prom_config_path = os.path.realpath( self.prom_config_path) # resolve symlink self.logger.info("Metric Configuration Saved at {path}".format( path=self.prom_config_path)) setup_metric_config(query_frontend_metric_name, self.prom_config_path, CLIPPER_INTERNAL_METRIC_PORT) metric_frontend_name = "metric_frontend-{}".format( random.randint(0, 100000)) self.prometheus_port = find_unbound_port(self.prometheus_port) metric_labels = self.common_labels.copy() metric_labels[CLIPPER_DOCKER_PORT_LABELS['metric']] = str( self.prometheus_port) metric_labels[CLIPPER_METRIC_CONFIG_LABEL] = self.prom_config_path run_metric_image(metric_frontend_name, self.docker_client, metric_labels, self.prometheus_port, self.prom_config_path, self.log_config, self.extra_container_kwargs) check_container_status( docker_client=self.docker_client, name=metric_frontend_name) self.connect()
[docs] def connect(self): """ Use the cluster name to update ports. Because they might not match as in start_clipper the ports might be changed. :return: None """ containers = list_containers( docker_client=self.docker_client, filters={ 'label': [ '{key}={val}'.format( key=CLIPPER_DOCKER_LABEL, val=self.cluster_name) ] }) all_labels = {} for container in containers: all_labels.update(container.labels) if CLIPPER_DOCKER_PORT_LABELS['redis'] in all_labels: self.redis_port = all_labels[CLIPPER_DOCKER_PORT_LABELS['redis']] self.clipper_management_port = all_labels[CLIPPER_DOCKER_PORT_LABELS[ 'management']] self.clipper_query_port = all_labels[CLIPPER_DOCKER_PORT_LABELS[ 'query_rest']] self.clipper_rpc_port = all_labels[CLIPPER_DOCKER_PORT_LABELS[ 'query_rpc']] self.prometheus_port = all_labels[CLIPPER_DOCKER_PORT_LABELS['metric']] self.prom_config_path = all_labels[CLIPPER_METRIC_CONFIG_LABEL] if self._is_valid_logging_state_to_connect(all_labels): self.connect_to_logging_system(all_labels)
def deploy_model(self, name, version, input_type, image, num_replicas=1): # Parameters # ---------- # image : str # The fully specified Docker imagesitory to deploy. If using a custom # registry, the registry name must be prepended to the image. For example, # "localhost:5000/my_model_name:my_model_version" or # "quay.io/my_namespace/my_model_name:my_model_version" self.set_num_replicas(name, version, input_type, image, num_replicas) def _get_replicas(self, name, version): containers = list_containers( docker_client=self.docker_client, filters={ "label": [ "{key}={val}".format( key=CLIPPER_DOCKER_LABEL, val=self.cluster_name), "{key}={val}".format( key=CLIPPER_MODEL_CONTAINER_LABEL, val=create_model_container_label(name, version)) ] }) return containers def get_num_replicas(self, name, version): return len(self._get_replicas(name, version)) def _add_replica(self, name, version, input_type, image): containers = list_containers( docker_client=self.docker_client, filters={ "label": [ "{key}={val}".format( key=CLIPPER_DOCKER_LABEL, val=self.cluster_name), CLIPPER_QUERY_FRONTEND_CONTAINER_LABEL ] }) if len(containers) < 1: self.logger.warning("No Clipper query frontend found.") raise ClipperException( "No Clipper query frontend to attach model container to") query_frontend_hostname = containers[0].name env_vars = { "CLIPPER_MODEL_NAME": name, "CLIPPER_MODEL_VERSION": version, # NOTE: assumes this container being launched on same machine # in same docker network as the query frontend "CLIPPER_IP": query_frontend_hostname, "CLIPPER_INPUT_TYPE": input_type, } model_container_label = create_model_container_label(name, version) labels = self.common_labels.copy() labels[CLIPPER_MODEL_CONTAINER_LABEL] = model_container_label labels[CLIPPER_DOCKER_LABEL] = self.cluster_name model_container_name = model_container_label + '-{}'.format( random.randint(0, 100000)) run_container( docker_client=self.docker_client, image=image, name=model_container_name, environment=env_vars, labels=labels, log_config=self.log_config, extra_container_kwargs=self.extra_container_kwargs) # Metric Section add_to_metric_config(model_container_name, self.prom_config_path, self.prometheus_port, CLIPPER_INTERNAL_METRIC_PORT) # Return model_container_name so we can check if it's up and running later return model_container_name def set_num_replicas(self, name, version, input_type, image, num_replicas): current_replicas = self._get_replicas(name, version) if len(current_replicas) < num_replicas: num_missing = num_replicas - len(current_replicas) self.logger.info( "Found {cur} replicas for {name}:{version}. Adding {missing}". format( cur=len(current_replicas), name=name, version=version, missing=(num_missing))) model_container_names = [] for _ in range(num_missing): container_name = self._add_replica(name, version, input_type, image) model_container_names.append(container_name) for name in model_container_names: check_container_status( docker_client=self.docker_client, name=name) elif len(current_replicas) > num_replicas: num_extra = len(current_replicas) - num_replicas self.logger.info( "Found {cur} replicas for {name}:{version}. Removing {extra}". format( cur=len(current_replicas), name=name, version=version, extra=(num_extra))) while len(current_replicas) > num_replicas: cur_container = current_replicas.pop() cur_container.stop() # Metric Section delete_from_metric_config(cur_container.name, self.prom_config_path, self.prometheus_port)
[docs] def get_logs(self, logging_dir): if self.centralize_log: return self.logging_system_instance.get_logs(logging_dir) else: return get_logs_from_containers(self, logging_dir)
[docs] def stop_models(self, models): containers = list_containers( docker_client=self.docker_client, filters={ "label": [ CLIPPER_MODEL_CONTAINER_LABEL, "{key}={val}".format( key=CLIPPER_DOCKER_LABEL, val=self.cluster_name) ] }) for c in containers: c_name, c_version = parse_model_container_label( c.labels[CLIPPER_MODEL_CONTAINER_LABEL]) if c_name in models and c_version in models[c_name]: c.stop()
def stop_all_model_containers(self): containers = list_containers( docker_client=self.docker_client, filters={ "label": [ CLIPPER_MODEL_CONTAINER_LABEL, "{key}={val}".format( key=CLIPPER_DOCKER_LABEL, val=self.cluster_name) ] }) for c in containers: c.stop()
[docs] def stop_all(self, graceful=True): containers = list_containers( docker_client=self.docker_client, filters={ "label": "{key}={val}".format( key=CLIPPER_DOCKER_LABEL, val=self.cluster_name) }) for c in containers: if graceful: c.stop() else: c.kill()
def _is_valid_logging_state_to_connect(self, all_labels): if self.centralize_log and not self.logging_system.container_is_running(all_labels): raise ClipperException( "Invalid state detected. " "log centralization is {log_centralization_state}, " "but cannot find fluentd instance running. " "Please change your use_centralized_log parameter of DockerContainermanager" .format(log_centralization_state=self.centralize_log) ) return self.logging_system.container_is_running(all_labels) def connect_to_logging_system(self, all_labels): if not self.centralize_log: logger.info( "The current DockerContainerManager's use_centralized_log flag is False, " "but there is a logging system {type} instance running in a cluster." "It means that clipper cluster you want to connect uses log centralization." "We will set the flag on to avoid unexpected bugs" .format(type=self.logging_system.get_type()) ) self.centralize_log= True self.logging_system_instance = \ self.logging_system( self.logger, self.cluster_name, self.docker_client, port=all_labels[CLIPPER_DOCKER_PORT_LABELS['fluentd']], conf_path=all_labels[CLIPPER_FLUENTD_CONFIG_LABEL] ) self.log_config = self.logging_system_instance.get_log_config() def get_admin_addr(self): return "{host}:{port}".format( host=self.public_hostname, port=self.clipper_management_port) def get_query_addr(self): return "{host}:{port}".format( host=self.public_hostname, port=self.clipper_query_port) def get_metric_addr(self): return "{host}:{port}".format( host=self.public_hostname, port=self.prometheus_port)
def find_unbound_port(start=None, increment=False, port_range=(10000, 50000), verbose=False, logger=None): """ Find a unbound port. Parameters ---------- start : int The port number to start with. If this port is unbounded, return this port. If None, start will be a random port. increment : bool If True, find port by incrementing start port; else, random search. port_range : tuple The range of port for random number generation verbose : bool Verbose flag for logging logger: logging.Logger """ while True: if not start: start = random.randint(*port_range) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: sock.bind(("127.0.0.1", start)) # Make sure we clean up after binding del sock return start except socket.error as e: if verbose and logger: logger.info("Socket error: {}".format(e)) logger.info( "randomly generated port %d is bound. Trying again." % start) if increment: start += 1 else: start = random.randint(*port_range)