Skip to content

KubeCluster Client –> Scheduler: missing port number in address 'kubernetes' #405

@haf

Description

@haf

What happened:

When spawning a new k8s cluster, I get these errors:

run-mmm-dask-27408369-swmpw run-mmm CRITICAL:our_package.main:Running of program failed: missing port number in address 'kubernetes'
run-mmm-dask-27408369-swmpw run-mmm Traceback (most recent call last):
run-mmm-dask-27408369-swmpw run-mmm   File "/venv/lib/python3.9/site-packages/our_package/main.py", line 84, in main
run-mmm-dask-27408369-swmpw run-mmm     return run(config, sem)
run-mmm-dask-27408369-swmpw run-mmm   File "/venv/lib/python3.9/site-packages/our_package/main.py", line 70, in run
run-mmm-dask-27408369-swmpw run-mmm     return run_dask(config)
run-mmm-dask-27408369-swmpw run-mmm   File "/venv/lib/python3.9/site-packages/our_package/impl_dask.py", line 404, in run_dask
run-mmm-dask-27408369-swmpw run-mmm     client = create_client(config)
run-mmm-dask-27408369-swmpw run-mmm   File "/venv/lib/python3.9/site-packages/our_package/impl_dask.py", line 257, in create_client
run-mmm-dask-27408369-swmpw run-mmm     return create_kube(config)
run-mmm-dask-27408369-swmpw run-mmm   File "/venv/lib/python3.9/site-packages/our_package/impl_dask.py", line 246, in create_kube
run-mmm-dask-27408369-swmpw run-mmm     return Client(k)
run-mmm-dask-27408369-swmpw run-mmm   File "/venv/lib/python3.9/site-packages/distributed/client.py", line 923, in __init__
run-mmm-dask-27408369-swmpw run-mmm     self.start(timeout=timeout)
run-mmm-dask-27408369-swmpw run-mmm   File "/venv/lib/python3.9/site-packages/distributed/client.py", line 1081, in start
run-mmm-dask-27408369-swmpw run-mmm     sync(self.loop, self._start, **kwargs)
run-mmm-dask-27408369-swmpw run-mmm   File "/venv/lib/python3.9/site-packages/distributed/utils.py", line 363, in sync
run-mmm-dask-27408369-swmpw run-mmm     raise exc.with_traceback(tb)
run-mmm-dask-27408369-swmpw run-mmm Traceback (most recent call last):
run-mmm-dask-27408369-swmpw run-mmm   File "/usr/local/lib/python3.9/runpy.py", line 197, in _run_module_as_main
run-mmm-dask-27408369-swmpw run-mmm     return _run_code(code, main_globals, None,
run-mmm-dask-27408369-swmpw run-mmm   File "/usr/local/lib/python3.9/runpy.py", line 87, in _run_code
run-mmm-dask-27408369-swmpw run-mmm     exec(code, run_globals)
run-mmm-dask-27408369-swmpw run-mmm   File "/venv/lib/python3.9/site-packages/our_package/main.py", line 91, in <module>
run-mmm-dask-27408369-swmpw run-mmm   File "/venv/lib/python3.9/site-packages/distributed/utils.py", line 348, in f
run-mmm-dask-27408369-swmpw run-mmm     result[0] = yield future
run-mmm-dask-27408369-swmpw run-mmm   File "/venv/lib/python3.9/site-packages/tornado/gen.py", line 762, in run
run-mmm-dask-27408369-swmpw run-mmm     value = future.result()
run-mmm-dask-27408369-swmpw run-mmm   File "/venv/lib/python3.9/site-packages/distributed/client.py", line 1173, in _start
run-mmm-dask-27408369-swmpw run-mmm     await self._ensure_connected(timeout=timeout)
run-mmm-dask-27408369-swmpw run-mmm     sys.exit(main(sys.argv[1:]))
run-mmm-dask-27408369-swmpw run-mmm   File "/venv/lib/python3.9/site-packages/our_package/main.py", line 87, in main
run-mmm-dask-27408369-swmpw run-mmm   File "/venv/lib/python3.9/site-packages/distributed/client.py", line 1232, in _ensure_connected
run-mmm-dask-27408369-swmpw run-mmm     comm = await connect(
run-mmm-dask-27408369-swmpw run-mmm   File "/venv/lib/python3.9/site-packages/distributed/comm/core.py", line 289, in connect
run-mmm-dask-27408369-swmpw run-mmm     comm = await asyncio.wait_for(
run-mmm-dask-27408369-swmpw run-mmm   File "/usr/local/lib/python3.9/asyncio/tasks.py", line 479, in wait_for
run-mmm-dask-27408369-swmpw run-mmm     return fut.result()
run-mmm-dask-27408369-swmpw run-mmm   File "/venv/lib/python3.9/site-packages/distributed/comm/tcp.py", line 405, in connect
run-mmm-dask-27408369-swmpw run-mmm     ip, port = parse_host_port(address)
run-mmm-dask-27408369-swmpw run-mmm   File "/venv/lib/python3.9/site-packages/distributed/comm/addressing.py", line 95, in parse_host_port
run-mmm-dask-27408369-swmpw run-mmm     port = _default()
run-mmm-dask-27408369-swmpw run-mmm   File "/venv/lib/python3.9/site-packages/distributed/comm/addressing.py", line 73, in _default
run-mmm-dask-27408369-swmpw run-mmm     raise ValueError(f"missing port number in address {address!r}")

With versions:


[[package]]
name = "dask-kubernetes"
version = "2022.1.0"
description = "Native Kubernetes integration for Dask"
category = "main"
optional = false
python-versions = ">=3.7"

What you expected to happen:

I haven't declared kubernetes anywhere, so I don't expect it to crash on it.

I've disabled istio injection, so it should not be a problem with istio.

When looking at the pods, both a worker and a scheduler is created in the cluster and they connect to each other (looking at their logs).

The code is running from the same cluster as it's deploying to.

Minimal Complete Verifiable Example:

from os import getenv
from dask import config as dconfig
from dask.distributed import Client, LocalCluster, as_completed
from dask_kubernetes import KubeCluster
from distributed import Future

def create_kube(config):
    env = [
        {
            "name": "K8S_NODE_NAME",
            "valueFrom": {"fieldRef": {"fieldPath": "spec.nodeName"}},
        },
        {
            "name": "K8S_POD_NAME",
            "valueFrom": {"fieldRef": {"fieldPath": "metadata.name"}},
        },
        {
            "name": "EXAMPLE",
            "valueFrom": {
                "secretKeyRef": {
                    "name": "analytics-pguser-modelruns",
                    "key": "host",
                },
            },
        },
    ]

    # v1/Pod spec
    pod_template = {
        "metadata": {
            "annotations": {
                "sidecar.istio.io/inject": "false",
            },
            "labels": {
                "app": "run-mmm",
                "component": "worker",
                "via": "DaskCluster",
            },
        },
        "spec": {
            "serviceAccountName": "dask-worker",
            "tolerations": [
                {
                    "key": "dedicated",
                    "operator": "Equal",
                    "value": "dask",
                    "effect": "NoSchedule",
                },
            ],
            "affinity": {
                "nodeAffinity": {
                    "requiredDuringSchedulingIgnoredDuringExecution": {
                        "nodeSelectorTerms": [
                            {
                                "matchExpressions": [
                                    {
                                        "key": "dedicated",
                                        "operator": "In",
                                        "values": ["dask"],
                                    },
                                ]
                            },
                        ]
                    }
                }
            },
            "containers": [
                {
                    "name": "worker",
                    "image": config.dask.kube_image,
                    "args": [
                        "dask-worker",
                        "$(DASK_SCHEDULER_ADDRESS)",
                        "--no-dashboard",
                        "--nthreads",
                        "40",
                        "--nprocs",
                        "1",
                        "--death-timeout",
                        "60",
                        "--memory-limit",
                        "64GB",
                    ],
                    "env": env,
                    "resources": {
                        "requests": {
                            "cpu": "15000m",
                            "memory": "60G",
                        },
                        "limits": {
                            "cpu": "16000m",
                            "memory": "64G",
                        },
                    },
                }
            ],
            "restartPolicy": "Never",
        },
    }

    scheduler_pod_template = {
        **pod_template,
        "metadata": {
            "annotations": {
                "sidecar.istio.io/inject": "false",
            },
            "labels": {
                "app": "run-mmm",
                "component": "scheduler",
                "via": "DaskCluster",
            },
        },
        "tolerations": [],
        "affinity": None,
        "spec": {
            **pod_template["spec"],
            "containers": [
                {
                    "name": "scheduler",
                    "image": config.dask.kube_image,
                    "args": [
                        "dask-scheduler",
                        "--port",
                        "8786",
                        "--bokeh-port",
                        "8787",
                    ],
                    "resources": {
                        "requests": {
                            "cpu": "100m",
                            "memory": "256Mi",
                        },
                        "limits": {
                            "cpu": "2",
                            "memory": "2Gi",
                        },
                    },
                }
            ],
        },
    }

    k = KubeCluster(
        pod_template,
        name=getenv("K8S_POD_NAME", default="run-mmm-dask"),
        namespace=config.dask.kube_ns,
        scheduler_service_wait_timeout=60 * 10,
        scheduler_pod_template=scheduler_pod_template,
    )

    # k.adapt(minimum=config.dask.kube_min, maximum=config.dask.kube_max)
    k.scale(3)

    logger.info(f"Using scheduler_address={k.scheduler_address}")

    logger.info("Using Dask config:")
    for k, v in dconfig.config.items():
        logger.info(f"{str(k)}={str(v)}")

    return Client(k)

Anything else we need to know?:

Environment:


[[package]]
name = "dask"
version = "2022.1.1"
description = "Parallel PyData with Task Scheduling"
category = "main"
optional = false
python-versions = ">=3.7"

  • Python version: FROM python:3.9-slim as base
FROM python:3.9-slim as base

ENV VIRTUAL_ENV=/venv \
  PIP_NO_CACHE_DIR=1 \
  PIP_DISABLE_PIP_VERSION_CHECK=1 \
  PIP_DEFAULT_TIMEOUT=100 \
  BUILD_ESSENTIAL_VERSION=12.9

WORKDIR /app

# We're putting the installation of these packages in the base image because cmdstanpy
# actually compiles the models at runtime rather than at compile time. While this makes
# for more lax security, this container is not expected to listen to user-supplied
# inputs.
RUN apt-get update \
    && DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends \
      tzdata \
      ca-certificates \
      build-essential=${BUILD_ESSENTIAL_VERSION} \
      curl \
      python3-dev \
      libssl-dev \
    && rm -rf /var/lib/apt/lists/*


FROM base as builder

RUN python -m venv ${VIRTUAL_ENV}
ENV PATH="$VIRTUAL_ENV/bin:$PATH" \
  CMDSTANPY_VERSION=1.0.0

# When we install cmdstanpy here, we're also installing e.g. ujson, pandas and numpy
# because cmdstanpy depends on those; but then later, when installing the poetry-built
# wheels, we'll uninstall them and install their locked versions. This is a trade-off
# with running cmdstanpy.install after the wheels which is much worse from a cache-busting
# perspective.
RUN pip install --upgrade "cmdstanpy==${CMDSTANPY_VERSION}" wheel

COPY cmdstanpy-install.py ./
RUN HOME=/app python cmdstanpy-install.py

RUN echo VIRTUAL_ENV=${VIRTUAL_ENV}

# deps
COPY ./deps/*.whl ./
RUN pip install *.whl && rm -rf *.whl

COPY ./*.whl ./
RUN pip install *.whl && rm -rf *.whl


FROM base as final

ARG COMMIT_SHA
ARG COMMIT_REF

ENV HOME=/app

COPY --from=builder /venv /venv
COPY --from=builder ${HOME}/.cmdstan ${HOME}/.cmdstan

ENV COMMIT_SHA=${COMMIT_SHA} \
  COMMIT_REF=${COMMIT_REF} \
  PATH="${VIRTUAL_ENV}/bin:${PATH}"

RUN echo VIRTUAL_ENV=${VIRTUAL_ENV} PATH=${PATH}

# main entrypoint
ENTRYPOINT ["python", "-m", "our_package.main"]
CMD []

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions