Source code for clipper_admin.docker.docker_container_manager

from __future__ import absolute_import, division, print_function
import docker
import logging
import os
import sys
import random
import time
import json
from ..container_manager import (
    create_model_container_label, parse_model_container_label,
    ContainerManager, CLIPPER_DOCKER_LABEL, CLIPPER_MODEL_CONTAINER_LABEL,
    CLIPPER_QUERY_FRONTEND_CONTAINER_LABEL,
    CLIPPER_MGMT_FRONTEND_CONTAINER_LABEL, CLIPPER_INTERNAL_RPC_PORT,
    CLIPPER_INTERNAL_QUERY_PORT, CLIPPER_INTERNAL_MANAGEMENT_PORT,
    CLIPPER_INTERNAL_METRIC_PORT)
from ..exceptions import ClipperException
from requests.exceptions import ConnectionError
from .docker_metric_utils import *

logger = logging.getLogger(__name__)


[docs]class DockerContainerManager(ContainerManager):
[docs] def __init__(self, docker_ip_address="localhost", clipper_query_port=1337, clipper_management_port=1338, clipper_rpc_port=7000, redis_ip=None, redis_port=6379, prometheus_port=9090, docker_network="clipper_network", extra_container_kwargs={}): """ Parameters ---------- docker_ip_address : str, optional The public hostname or IP address at which the Clipper Docker containers can be accessed via their exposed ports. This should almost always be "localhost". Only change if you know what you're doing! clipper_query_port : int, optional The port on which the query frontend should listen for incoming prediction requests. clipper_management_port : int, optional The port on which the management frontend should expose the management REST API. clipper_rpc_port : int, optional The port to start the Clipper RPC service on. redis_ip : str, optional The address of a running Redis cluster. If set to None, Clipper will start a Redis container for you. redis_port : int, optional The Redis port. If ``redis_ip`` is set to None, Clipper will start Redis on this port. If ``redis_ip`` is provided, Clipper will connect to Redis on this port. docker_network : str, optional The docker network to attach the containers to. You can read more about Docker networking in the `Docker User Guide <https://docs.docker.com/engine/userguide/networking/>`_. extra_container_kwargs : dict Any additional keyword arguments to pass to the call to :py:meth:`docker.client.containers.run`. """ self.public_hostname = docker_ip_address self.clipper_query_port = clipper_query_port self.clipper_management_port = clipper_management_port self.clipper_rpc_port = clipper_rpc_port self.redis_ip = redis_ip if redis_ip is None: self.external_redis = False else: self.external_redis = True self.redis_port = redis_port self.prometheus_port = prometheus_port if docker_network is "host": raise ClipperException( "DockerContainerManager does not support running Clipper on the " "\"host\" docker network. Please pick a different network name" ) self.docker_network = docker_network self.docker_client = docker.from_env() self.extra_container_kwargs = extra_container_kwargs.copy() # Merge Clipper-specific labels with any user-provided labels if "labels" in self.extra_container_kwargs: self.common_labels = self.extra_container_kwargs.pop("labels") self.common_labels.update({CLIPPER_DOCKER_LABEL: ""}) else: self.common_labels = {CLIPPER_DOCKER_LABEL: ""} container_args = { "network": self.docker_network, "detach": True, } self.extra_container_kwargs.update(container_args)
def start_clipper(self, query_frontend_image, mgmt_frontend_image, cache_size, num_frontend_replicas=1): if num_frontend_replicas != 1: msg = "Docker container manager's query frontend scale-out \ hasn't been implemented. Please set num_frontend_replicas=1 \ or use Kubernetes." raise ClipperException(msg) try: self.docker_client.networks.create( self.docker_network, check_duplicate=True) except docker.errors.APIError: logger.debug( "{nw} network already exists".format(nw=self.docker_network)) except ConnectionError: msg = "Unable to Connect to Docker. Please Check if Docker is running." raise ClipperException(msg) if not self.external_redis: logger.info("Starting managed Redis instance in Docker") redis_container = self.docker_client.containers.run( 'redis:alpine', "redis-server --port %s" % self.redis_port, name="redis-{}".format(random.randint( 0, 100000)), # generate a random name ports={'%s/tcp' % self.redis_port: self.redis_port}, labels=self.common_labels.copy(), **self.extra_container_kwargs) self.redis_ip = redis_container.name mgmt_cmd = "--redis_ip={redis_ip} --redis_port={redis_port}".format( redis_ip=self.redis_ip, redis_port=self.redis_port) mgmt_labels = self.common_labels.copy() mgmt_labels[CLIPPER_MGMT_FRONTEND_CONTAINER_LABEL] = "" self.docker_client.containers.run( mgmt_frontend_image, mgmt_cmd, name="mgmt_frontend-{}".format(random.randint( 0, 100000)), # generate a random name ports={ '%s/tcp' % CLIPPER_INTERNAL_MANAGEMENT_PORT: self.clipper_management_port }, labels=mgmt_labels, **self.extra_container_kwargs) query_cmd = ("--redis_ip={redis_ip} --redis_port={redis_port} " "--prediction_cache_size={cache_size}").format( redis_ip=self.redis_ip, redis_port=self.redis_port, cache_size=cache_size) query_labels = self.common_labels.copy() query_labels[CLIPPER_QUERY_FRONTEND_CONTAINER_LABEL] = "" query_container_id = random.randint(0, 100000) query_name = "query_frontend-{}".format(query_container_id) self.docker_client.containers.run( query_frontend_image, query_cmd, name=query_name, ports={ '%s/tcp' % CLIPPER_INTERNAL_QUERY_PORT: self.clipper_query_port, '%s/tcp' % CLIPPER_INTERNAL_RPC_PORT: self.clipper_rpc_port }, labels=query_labels, **self.extra_container_kwargs) # Metric Section query_frontend_metric_name = "query_frontend_exporter-{}".format( query_container_id) run_query_frontend_metric_image( query_frontend_metric_name, self.docker_client, query_name, self.common_labels, self.extra_container_kwargs) setup_metric_config(query_frontend_metric_name, CLIPPER_INTERNAL_METRIC_PORT) run_metric_image(self.docker_client, self.common_labels, self.prometheus_port, self.extra_container_kwargs) self.connect() def connect(self): # No extra connection steps to take on connection return def deploy_model(self, name, version, input_type, image, num_replicas=1): # Parameters # ---------- # image : str # The fully specified Docker imagesitory to deploy. If using a custom # registry, the registry name must be prepended to the image. For example, # "localhost:5000/my_model_name:my_model_version" or # "quay.io/my_namespace/my_model_name:my_model_version" self.set_num_replicas(name, version, input_type, image, num_replicas) def _get_replicas(self, name, version): containers = self.docker_client.containers.list( filters={ "label": "{key}={val}".format( key=CLIPPER_MODEL_CONTAINER_LABEL, val=create_model_container_label(name, version)) }) return containers def get_num_replicas(self, name, version): return len(self._get_replicas(name, version)) def _add_replica(self, name, version, input_type, image): containers = self.docker_client.containers.list( filters={ "label": CLIPPER_QUERY_FRONTEND_CONTAINER_LABEL }) if len(containers) < 1: logger.warning("No Clipper query frontend found.") raise ClipperException( "No Clipper query frontend to attach model container to") query_frontend_hostname = containers[0].name env_vars = { "CLIPPER_MODEL_NAME": name, "CLIPPER_MODEL_VERSION": version, # NOTE: assumes this container being launched on same machine # in same docker network as the query frontend "CLIPPER_IP": query_frontend_hostname, "CLIPPER_INPUT_TYPE": input_type, } model_container_label = create_model_container_label(name, version) labels = self.common_labels.copy() labels[CLIPPER_MODEL_CONTAINER_LABEL] = model_container_label model_container_name = model_container_label + '-{}'.format( random.randint(0, 100000)) self.docker_client.containers.run( image, name=model_container_name, environment=env_vars, labels=labels, **self.extra_container_kwargs) # Metric Section add_to_metric_config(model_container_name, CLIPPER_INTERNAL_METRIC_PORT) # Return model_container_name so we can check if it's up and running later return model_container_name def set_num_replicas(self, name, version, input_type, image, num_replicas): current_replicas = self._get_replicas(name, version) if len(current_replicas) < num_replicas: num_missing = num_replicas - len(current_replicas) logger.info( "Found {cur} replicas for {name}:{version}. Adding {missing}". format( cur=len(current_replicas), name=name, version=version, missing=(num_missing))) model_container_names = [] for _ in range(num_missing): container_name = self._add_replica(name, version, input_type, image) model_container_names.append(container_name) for name in model_container_names: container = self.docker_client.containers.get(name) while container.attrs.get("State").get("Status") != "running" or \ self.docker_client.api.inspect_container(name).get("State").get("Health").get("Status") != "healthy": time.sleep(3) elif len(current_replicas) > num_replicas: num_extra = len(current_replicas) - num_replicas logger.info( "Found {cur} replicas for {name}:{version}. Removing {extra}". format( cur=len(current_replicas), name=name, version=version, extra=(num_extra))) while len(current_replicas) > num_replicas: cur_container = current_replicas.pop() cur_container.stop() # Metric Section delete_from_metric_config(cur_container.name)
[docs] def get_logs(self, logging_dir): containers = self.docker_client.containers.list( filters={ "label": CLIPPER_DOCKER_LABEL }) logging_dir = os.path.abspath(os.path.expanduser(logging_dir)) log_files = [] if not os.path.exists(logging_dir): os.makedirs(logging_dir) logger.info("Created logging directory: %s" % logging_dir) for c in containers: log_file_name = "image_{image}:container_{id}.log".format( image=c.image.short_id, id=c.short_id) log_file = os.path.join(logging_dir, log_file_name) if sys.version_info < (3, 0): with open(log_file, "w") as lf: lf.write(c.logs(stdout=True, stderr=True)) else: with open(log_file, "wb") as lf: lf.write(c.logs(stdout=True, stderr=True)) log_files.append(log_file) return log_files
[docs] def stop_models(self, models): containers = self.docker_client.containers.list( filters={ "label": CLIPPER_MODEL_CONTAINER_LABEL }) for c in containers: c_name, c_version = parse_model_container_label( c.labels[CLIPPER_MODEL_CONTAINER_LABEL]) if c_name in models and c_version in models[c_name]: c.stop()
def stop_all_model_containers(self): containers = self.docker_client.containers.list( filters={ "label": CLIPPER_MODEL_CONTAINER_LABEL }) for c in containers: c.stop()
[docs] def stop_all(self): containers = self.docker_client.containers.list( filters={ "label": CLIPPER_DOCKER_LABEL }) for c in containers: c.stop()
def get_admin_addr(self): return "{host}:{port}".format( host=self.public_hostname, port=self.clipper_management_port) def get_query_addr(self): return "{host}:{port}".format( host=self.public_hostname, port=self.clipper_query_port)