Skip to content

Commit

Permalink
Add new version of coffea_casa based on suggestion of Dask docs
Browse files Browse the repository at this point in the history
  • Loading branch information
oshadura committed Aug 17, 2020
1 parent ed624db commit f36f589
Show file tree
Hide file tree
Showing 9 changed files with 242 additions and 399 deletions.
13 changes: 8 additions & 5 deletions coffea_casa/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import

from . import version
#from .coffea_casa import CoffeaCasaCluster
from .coffea_casa_method import CoffeaCasaCluster

__version__ = version.__version__
import dask
import yaml
import os
from .coffea_casa import CoffeaCasaCluster
from .coffea_casa_method import coffea_casa_cluster
from .version import __version__
from . import config
314 changes: 148 additions & 166 deletions coffea_casa/coffea_casa.py
Original file line number Diff line number Diff line change
@@ -1,188 +1,170 @@
"""CoffeaCasaCluster class
"""
import os
import re
import sys

from pathlib import Path
import dask
from dask_jobqueue.core import cluster_parameters, job_parameters
from dask_jobqueue.htcondor import HTCondorCluster, HTCondorJob
from distributed.security import Security

from distributed.utils import import_term

# Port settings
DEFAULT_SCHEDULER_PORT = 8787
DEFAULT_DASHBOARD_PORT = 8786
DEFAULT_CONTAINER_PORT = 8787

class CoffeaCasaJob(HTCondorJob):
_script_template = """
%(shebang)s
# Security settings for Dask scheduler
SECRETS_DIR = Path('/etc/cmsaf-secrets')
CA_FILE = SECRETS_DIR / "ca.pem"
CERT_FILE = SECRETS_DIR / "hostcert.pem"
# XCache
XCACHE_FILE = SECRETS_DIR / "xcache_token"

%(job_header)s

Environment = "%(quoted_environment)s"
Arguments = "%(quoted_arguments)s"
Executable = %(executable)s
def merge_dicts(*dict_args):
"""
Given any number of dictionaries, shallow copy and merge into a new dict,
precedence goes to key value pairs in latter dictionaries.
"""
result = {}
for dictionary in dict_args:
result.update(dictionary)
return result

Queue
""".lstrip()

class CoffeaCasaJob(HTCondorJob):
"""
"""
submit_command = "condor_submit -spool"
config_name = "coffea-casa"

def __init__(
self,
scheduler=None,
name=None,
worker_image=None,
protocol=None,
scheduler_port=8787,
config_name=None,
**base_class_kwargs
):
# Instantiate args and parameters from parent abstract class
super().__init__(
scheduler=scheduler, name=name, config_name=config_name, **base_class_kwargs
)
# Docker: Dask worker Docker image is "must".
if worker_image is None:
raise ValueError(
"You must specify worker image for Coffea-casa analysis ``worker_image='coffeateam/coffea-casa:latest'``"

class CoffeaCasaCluster(HTCondorCluster):
job_cls = CoffeaCasaJob
config_name = "coffea-casa"

def __init__(self,
*,
security=None,
worker_image=None,
scheduler_options=None,
scheduler_port=DEFAULT_SCHEDULER_PORT,
dashboard_port=DEFAULT_DASHBOARD_PORT,
**job_kwargs
):
"""
Parameters
----------
worker_image
Defaults to ``coffeateam/coffea-casa-analysis``
(https://hub.docker.com/r/coffeateam/coffea-casa-analysis).
scheduler_port
dashboard_port
job_kwargs
"""
job_kwargs = self._modify_job_kwargs(
job_kwargs,
security=security,
worker_image=worker_image,
scheduler_port=scheduler_port,
dashboard_port=dashboard_port,
)
if worker_image:
self.worker_image = worker_image
# Scheduler port:
if scheduler_port:
self.scheduler_port = scheduler_port
# Networking: we need to have external IP (manadatory for Dask worker<->scheduler comunication)
# Instantiate args and parameters from parent abstract class
super().__init__(**job_kwargs)

@classmethod
def _modify_job_kwargs(cls,
job_kwargs,
*,
security=None,
worker_image=None,
scheduler_options=None,
scheduler_port=DEFAULT_SCHEDULER_PORT,
dashboard_port=DEFAULT_DASHBOARD_PORT,
):
job_config = job_kwargs.copy()
## Security settings
# todo(oksana) -> I can't run local tests with my own Security object
if security:
job_config["protocol"] = 'tls://'
job_config["security"] = security
# We hope we have files locally and it is used
# for local tests only for now.
files = ""
else:
if CA_FILE.is_file() and CERT_FILE.is_file():
job_config["protocol"] = 'tls://'
job_config["security"] = cls.security()
input_files = [CA_FILE, CERT_FILE, XCACHE_FILE]
files = ", ".join(str(path) for path in input_files)
else:
job_config["protocol"] = 'tcp://'
input_files = [XCACHE_FILE]
files = ", ".join(str(path) for path in input_files)
## Networking settings
try:
external_ip = os.environ['HOST_IP']
except KeyError:
print("Please check with system administarator why external IP was not assigned for you.")
sys.exit(1)
# Networking: Special string case for external IP (external_ip_string will be used for scheduler_options
if external_ip:
external_address = str(protocol)+str(external_ip)+':'+str(scheduler_port)
external_ip_string = '"'+ external_address+'"'
self.external_address = external_address
self.external_ip = external_ip

# HTCondor: we have a custom job_extra for CoffeaCasaCluster
# Networking: add DaskScheduleAdress. For this we extend job header with job_header_extra
job_extra_default = {
"universe": "docker",
"docker_image": self.worker_image,
"container_service_names": "dask",
"dask_container_port": "8787",
"should_transfer_files": "YES",
"when_to_transfer_output": "ON_EXIT",
"transfer_input_files": "/etc/cmsaf-secrets/xcache_token,/etc/cmsaf-secrets/ca.pem,/etc/cmsaf-secrets/hostcert.pem",
"encrypt_input_files": "/etc/cmsaf-secrets/xcache_token,/etc/cmsaf-secrets/ca.pem,/etc/cmsaf-secrets/hostcert.pem",
"Stream_Output": False,
"Stream_Error": False,
"+DaskSchedulerAddress": external_ip_string,
}
self.job_extra_default=job_extra_default
self.job_header_dict.update(self.job_extra_default)

# Lets check if somebody else added already a job_extra
job_extra = base_class_kwargs.get("job_extra", None)
# lets check dask config environment
if job_extra is None:
self.job_extra = dask.config.get(
"jobqueue.%s.job-extra" % self.config_name, {})
else:
self.job_extra = job_extra

if self.job_extra:
self.job_header_dict.update(self.job_extra)

# Env_extra
env_extra = base_class_kwargs.get("env_extra", None)
# lets check dask config environment
if env_extra is None:
env_extra = dask.config.get(
"jobqueue.%s.env-extra" % self.config_name, default=[]
)
self.env_dict = self.env_lines_to_dict(env_extra)


# Update job script
def job_script(self):
""" Update a job submission script """
quoted_arguments = super(CoffeaCasaJob, self).quote_arguments(["-c", self._command_template])
quoted_environment = super(CoffeaCasaJob, self).quote_environment(self.env_dict)
job_header_lines = "\n".join(
"%s = %s" % (k, v) for k, v in self.job_header_dict.items()
address_list = [external_ip, DEFAULT_SCHEDULER_PORT]
external_address_short = ":".join(str(item) for item in address_list)
###
full_address_list = [job_config["protocol"], external_address_short]
external_address = "".join(str(item) for item in full_address_list)
external_ip_string = '"' + external_address + '"'
## Scheduler settings
# we need to pass and check protocol for scheduler
# (format should be not 'tls://'' but 'tls')
scheduler_protocol = job_config["protocol"]
job_config["scheduler_options"] = merge_dicts(
{
"port": scheduler_port,
"dashboard_address": str(dashboard_port),
"protocol": scheduler_protocol.replace("://",""),
"external_address": external_address_short,
},
job_kwargs.get(
"scheduler_options",
dask.config.get(f"jobqueue.{cls.config_name}.scheduler-options"),
),
)
return self._script_template % {
"shebang": self.shebang,
"job_header": job_header_lines,
"quoted_environment": quoted_environment,
"quoted_arguments": quoted_arguments,
"executable": self.executable,
}


class CoffeaCasaCluster(HTCondorCluster):
__doc__ = """ Launch Dask on an Coffea Casa k8s HTCondor cluster
Parameters
----------
cores=24, memory="4GB", disk="4GB", worker_image="coffeateam/coffea-casa:latest"
disk : str
Total amount of disk per job
job_extra : dict
Extra submit file attributes for the job
{job}
{cluster}
Examples
--------
>>> from coffea_casa.coffea_casa import CoffeaCasaCluster
>>> cluster = CoffeaCasaCluster(cores=24, memory="4GB", disk="4GB", worker_image="coffeateam/coffea-casa:latest")
>>> cluster.scale(jobs=10) # ask for 10 jobs
>>> from dask.distributed import Client
>>> client = Client(cluster)
This also works with adaptive clusters. This automatically launches and kill workers based on load.
>>> cluster.adapt(maximum_jobs=20)
""".format(
job=job_parameters, cluster=cluster_parameters
)
job_cls=CoffeaCasaJob

#scheduler_options=None,
def __init__(
self,
job_cls = CoffeaCasaJob,
scheduler_options=None,
dashboard_address=8786,
scheduler_spec=None,
protocol = "tcp://",
security=None,
**job_class_kwargs
):
# Instantiate args and parameters from parent abstract class
super().__init__(
scheduler_spec=scheduler_spec, scheduler_options=scheduler_options, security=security, protocol=protocol, **job_class_kwargs
## Job extra settings (HTCondor ClassAd)
job_config["job_extra"] = merge_dicts(
{
"universe": "docker",
"docker_image": worker_image or dask.config.get(f"jobqueue.{cls.config_name}.worker-image"),
},
{
"container_service_names": "dask",
"dask_container_port": DEFAULT_CONTAINER_PORT,
},
{"transfer_input_files": files},
{"encrypt_input_files": files},
{"transfer_output_files": '""'},
{"when_to_transfer_output": '"ON_EXIT"'},
{"should_transfer_files": '"YES"'},
{"Stream_Output": 'False'},
{"Stream_Error": 'False'},
{"+DaskSchedulerAddress": external_ip_string},
job_kwargs.get("job_extra", dask.config.get(f"jobqueue.{cls.config_name}.job-extra")),
)

# Add Custom scheduler options
self.protocol = protocol
# Protocol: We need to have for dask not "tls://" but "tls"
#if re.match(r'://', self.protocol):
self.protocol_dask = self.protocol.replace("://","")
#else:
# self.protocol_dask = self.protocol

self.dashboard_address = dashboard_address

default_job_cls = getattr(type(self), "job_cls", None)

scheduler_options_extra={
"protocol": self.protocol_dask,
"dashboard_address": self.dashboard_address,
"port": "8787",
# how to get external address from class?

#"external_address": job_class_kwargs.get("external_address", None),
}
self.scheduler_spec["options"].update(scheduler_options_extra)
print(job_config)
return job_config

print(self.scheduler_spec)

if scheduler_options:
scheduler_options = scheduler_options
self.scheduler_spec["options"].update(scheduler_options)
@classmethod
def security(cls):
"""
"""
ca_file = str(CA_FILE)
cert_file = str(CERT_FILE)
return Security(
tls_ca_file=ca_file,
tls_worker_cert=cert_file,
tls_worker_key=cert_file,
tls_client_cert=cert_file,
tls_client_key=cert_file,
tls_scheduler_cert=cert_file,
tls_scheduler_key=cert_file,
require_encryption=True,
)
Loading

0 comments on commit f36f589

Please sign in to comment.