Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] add docker worker #374

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions ramp-engine/ramp_engine/local/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from ._conda import CondaEnvWorker
from ._docker import DockerWorker

__all__ = [
"CondaEnvWorker",
"DockerWorker",
]
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import subprocess
from datetime import datetime

from .base import BaseWorker, _get_traceback
from ..base import BaseWorker, _get_traceback

logger = logging.getLogger('RAMP-WORKER')

Expand Down Expand Up @@ -47,31 +47,20 @@ class CondaEnvWorker(BaseWorker):
* 'finished': the worker finished to train the submission.
* 'collected': the results of the training have been collected.
"""
def __init__(self, config, submission):
super().__init__(config=config, submission=submission)

def setup(self):
"""Set up the worker.

The worker will find the path to the conda environment to use using
the configuration passed when instantiating the worker.
"""
# sanity check for the configuration variable
for required_param in ('kit_dir', 'data_dir', 'submissions_dir',
'logs_dir', 'predictions_dir'):
self._check_config_name(self.config, required_param)
# find the path to the conda environment
env_name = self.config.get('conda_env', 'base')
@staticmethod
def _find_conda_env_bin_path(config, cmd):
"""Find the `bin` path of a `conda` environment."""
env_name = config.get('conda_env', 'base')
proc = subprocess.Popen(
["conda", "info", "--envs", "--json"],
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT
)
stdout, _ = proc.communicate()
conda_info = json.loads(stdout)

if env_name == 'base':
self._python_bin_path = os.path.join(conda_info['envs'][0], 'bin')
python_bin_path = os.path.join(conda_info['envs'][0], 'bin')
else:
envs_path = conda_info['envs'][1:]
if not envs_path:
Expand All @@ -83,14 +72,32 @@ def setup(self):
for env in envs_path:
if env_name == os.path.split(env)[-1]:
is_env_found = True
self._python_bin_path = os.path.join(env, 'bin')
python_bin_path = os.path.join(env, 'bin')
break
if not is_env_found:
self.status = 'error'
raise ValueError('The specified conda environment {} does not '
'exist. You need to create it.'
.format(env_name))
super(CondaEnvWorker, self).setup()
return python_bin_path

def setup(self):
"""Set up the worker.

The worker will find the path to the conda environment to use using
the configuration passed when instantiating the worker.
"""
# sanity check for the configuration variable
for required_param in ('kit_dir', 'data_dir', 'submissions_dir',
'logs_dir', 'predictions_dir'):
self._check_config_name(self.config, required_param)
self._log_dir = os.path.join(self.config['logs_dir'], self.submission)
if not os.path.exists(self._log_dir):
os.makedirs(self._log_dir)
self._python_bin_path = _find_conda_env_bin_path(
self.config, ["conda", "info", "--envs", "--json"]
)
super().setup()

def teardown(self):
"""Remove the predictions stores within the submission."""
Expand Down Expand Up @@ -126,35 +133,37 @@ def check_timeout(self):
def timeout(self):
return self.config.get('timeout', 7200)

def launch_submission(self):
"""Launch the submission.

Basically, it comes to run ``ramp_test_submission`` using the conda
environment given in the configuration. The submission is launched in
a subprocess to free to not lock the Python main process.
"""
cmd_ramp = os.path.join(self._python_bin_path, 'ramp-test')
def _launch_ramp_test_submission(self, cmd):
if self.status == 'running':
raise ValueError('Wait that the submission is processed before to '
'launch a new one.')
self._log_dir = os.path.join(self.config['logs_dir'], self.submission)
if not os.path.exists(self._log_dir):
os.makedirs(self._log_dir)
self._log_file = open(os.path.join(self._log_dir, 'log'), 'wb+')
cmd_ramp = cmd + [
'--submission', self.submission,
'--ramp-kit-dir', self.config['kit_dir'],
'--ramp-data-dir', self.config['data_dir'],
'--ramp-submission-dir', self.config['submissions_dir'],
'--save-output',
'--ignore-warning'
]
self._proc = subprocess.Popen(
[cmd_ramp,
'--submission', self.submission,
'--ramp-kit-dir', self.config['kit_dir'],
'--ramp-data-dir', self.config['data_dir'],
'--ramp-submission-dir', self.config['submissions_dir'],
'--save-output',
'--ignore-warning'],
cmd_ramp,
stdout=self._log_file,
stderr=self._log_file,
)
super().launch_submission()
self._start_date = datetime.utcnow()

def launch_submission(self):
"""Launch the submission.

Basically, it comes to run ``ramp_test_submission`` using the conda
environment given in the configuration. The submission is launched in
a subprocess to free to not lock the Python main process.
"""
cmd_ramp = os.path.join(self._python_bin_path, 'ramp-test')
self._launch_ramp_test_submission(cmd_ramp)
super().launch_submission()

def collect_results(self):
"""Collect the results after that the submission is completed.

Expand Down
136 changes: 136 additions & 0 deletions ramp-engine/ramp_engine/local/_docker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import logging
import os
import subprocess

from ..base import BaseWorker
from ._conda import CondaEnvWorker

logger = logging.getLogger('RAMP-WORKER')


class DockerWorker(CondaEnvWorker):
"""Local worker which will run a submission within a docker container.

The worker will run a submission in a docker container. It will use `conda`
to manage the environment.

Parameters
----------
config : dict
Configuration dictionary to set the worker. The following parameter
should be set:

* 'conda_env': the name of the conda environment to use. If not
specified, the base environment will be used.
* 'kit_dir': path to the directory of the RAMP kit;
* 'data_dir': path to the directory of the data;
* 'submissions_dir': path to the directory containing the
submissions;
* `logs_dir`: path to the directory where the log of the
submission will be stored;
* `predictions_dir`: path to the directory where the
predictions of the submission will be stored.
* 'timeout': timeout after a given number of seconds when
running the worker. If not provided, a default of 7200
is used.
submission : str
Name of the RAMP submission to be handle by the worker.

Attributes
----------
status : str
The status of the worker. It should be one of the following state:

* 'initialized': the worker has been instanciated.
* 'setup': the worker has been set up.
* 'running': the worker is training the submission.
* 'finished': the worker finished to train the submission.
* 'collected': the results of the training have been collected.
"""
def setup(self):
# sanity check for the configuration variable
for required_param in ('kit_dir', 'data_dir', 'submissions_dir',
'logs_dir', 'predictions_dir'):
self._check_config_name(self.config, required_param)
self._log_dir = os.path.join(self.config['logs_dir'], self.submission)
if not os.path.exists(self._log_dir):
os.makedirs(self._log_dir)
# get path to conda specified in the path
conda_path = self.config.get('conda_dir', None)
docker_image = self.config['docker_image']
# start the conda image
docker_cmd = [
"docker", "run", "-itd", "--rm", "--name", f"{self.submission}"
]
if conda_path is not None:
# mount the conda path
docker_cmd += [
"--mount",
f'type=bind,source="{conda_path}",target="{conda_path}",readonly'
]
# add it to PATH
docker_cmd += ["--env", 'PATH="{conda_path}:$PATH"']
# add ramp-kit directory
mounted_dir = []
for key in ["kit_dir", "data_dir", "submissions_dir", "logs_dir"]:
mount_dir = self.config[key]
if mount_dir not in mounted_dir:
mounted_dir.append(mount_dir)
# docker_cmd += [
# "--mount",
# r"type=bind,source={},target={}".format(
# mount_dir, mount_dir
# )
# ]
docker_cmd += [
"-v", "{}:{}".format(mount_dir, mount_dir)
]
docker_cmd += [f'{docker_image}']
proc = subprocess.Popen(
docker_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout, stderr = proc.communicate()
if stderr:
logger.error(stderr.decode())
raise RuntimeError(stderr.decode())
self.running_container_hash = stdout.decode().split("\n")[0]
# find the path to the environment
self._docker_exec_cmd = ["docker", "exec", "-it"]
if conda_path is not None:
# add it to PATH
self._docker_exec_cmd += [
"--env", 'PATH="{conda_path}:$PATH"'
]
self._docker_exec_cmd += [
"--workdir", self.config['kit_dir']
]
self._docker_exec_cmd += ["-u", "root:root"]
self._docker_exec_cmd += [f"{self.submission}", "/bin/bash", "-c"]
cmd = self._docker_exec_cmd + ["conda info --envs --json"]
self._python_bin_path = self._find_conda_env_bin_path(self.config, cmd)
BaseWorker.setup(self)

def launch_submission(self):
print(self._python_bin_path)
cmd = self._docker_exec_cmd + [
'ramp-test'
]
self._launch_ramp_test_submission(cmd)
BaseWorker.launch_submission(self)

def collect_results(self):
BaseWorker.collect_results(self)
if self.status in ['finished', 'running', 'timeout']:
# communicate() will wait for the process to be completed
self._proc.communicate()
self._log_file.close()
mount_dir = os.path.join(os.getcwd(), self.config["kit_dir"])

def teardown(self):
"""Remove the predictions stores within the submission."""
proc = subprocess.run(
["docker", "container", "stop", self.running_container_hash],
)
super().teardown()
57 changes: 57 additions & 0 deletions ramp-engine/ramp_engine/tests/test_docker_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import os
import shutil

import pytest

from ramp_engine.local import DockerWorker


@pytest.fixture
def get_docker_worker(submission):
def _create_worker(
submission_name, conda_env='base',
docker_image="continuumio/miniconda3",
):
module_path = os.path.dirname(__file__)
config = {
'kit_dir': os.path.join(module_path, 'kits', 'iris'),
'data_dir': os.path.join(module_path, 'kits', 'iris'),
'submissions_dir': os.path.join(module_path, 'kits',
'iris', 'submissions'),
'logs_dir': os.path.join(module_path, 'kits', 'iris', 'log'),
'predictions_dir': os.path.join(
module_path, 'kits', 'iris', 'predictions'
),
'conda_env': conda_env,
'docker_image': docker_image,
}
return DockerWorker(config=config, submission=submission)
return _create_worker


def _remove_directory(worker):
output_training_dir = os.path.join(
worker.config['kit_dir'], 'submissions', worker.submission,
'training_output'
)
for directory in (output_training_dir,
worker.config['logs_dir'],
worker.config['predictions_dir']):
if os.path.exists(directory):
shutil.rmtree(directory)


@pytest.mark.parametrize("submission", ('starting_kit', 'random_forest_10_10'))
def test_docker_worker(submission, get_docker_worker):
worker = get_docker_worker(submission)
try:
assert worker.status == 'initialized'
worker.setup()
assert worker.status == 'setup'
worker.launch_submission()
worker.collect_results()
finally:
# remove all directories that we potentially created
worker._status = 'collected'
worker.teardown()
# _remove_directory(worker)
2 changes: 1 addition & 1 deletion ramp-utils/ramp_utils/tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def test_setup_init_event(deployment_dir):
shutil.rmtree(deployment_dir, ignore_errors=True)


def test_deploy_ramp_event():
def test_deploy_ramp_event(deployment_dir):
runner = CliRunner()
result = runner.invoke(main, ['deploy-event',
'--config', database_config_template(),
Expand Down