Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NFS-safe Dependency Manager #3861

Merged
merged 75 commits into from
Apr 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
b0f597b
nfs locking for dependency manager
teetone Oct 26, 2021
e0e3bc0
Merge branch 'master' of https://github.com/codalab/codalab-worksheet…
teetone Oct 26, 2021
80facb0
Merge branch 'master' of https://github.com/codalab/codalab-worksheet…
teetone Nov 8, 2021
ae2b7c2
NFS-safe DependencyManager
teetone Nov 23, 2021
16c69f2
Merge branch 'master' of https://github.com/codalab/codalab-worksheet…
teetone Nov 23, 2021
195529e
fix formatting
teetone Nov 23, 2021
853ec49
basic test
teetone Nov 23, 2021
200dfa2
Tests
teetone Nov 24, 2021
19e723d
NFS-safe
teetone Nov 29, 2021
3a1ba09
Single global lock + update comprehensive test
teetone Nov 29, 2021
5577ea0
cleanup
teetone Nov 29, 2021
cd45659
cleanup
teetone Nov 29, 2021
eda1e74
disable test for the ratarmountcore issue
teetone Nov 29, 2021
19bc723
test
teetone Nov 29, 2021
ba1578d
test
teetone Nov 29, 2021
7bc1009
debug
teetone Nov 29, 2021
6b643f2
make debugging easier
teetone Nov 30, 2021
d7a0263
fix test
teetone Nov 30, 2021
734879e
debug
teetone Nov 30, 2021
5a6f4b5
log when failing to load or commit state to JSON file
teetone Nov 30, 2021
b51c1b2
log when failing to load or commit state to JSON file
teetone Nov 30, 2021
3e657c4
more logging
teetone Nov 30, 2021
877d6c5
Direct I/O for state committer
teetone Dec 1, 2021
9e30947
Merge branch 'master' of https://github.com/codalab/codalab-worksheet…
teetone Dec 1, 2021
233df8b
Merge branch 'flufl' of https://github.com/codalab/codalab-worksheets…
teetone Dec 1, 2021
6725a89
test
teetone Dec 1, 2021
761afdb
fix formatting; increase stress for test
teetone Dec 1, 2021
efc350a
cleanup
teetone Dec 1, 2021
1a5d5df
Reduce I/O
teetone Dec 1, 2021
574a37d
more logging for job scheduling
teetone Dec 1, 2021
6f80a9d
abort download
teetone Dec 1, 2021
830bd5c
Update download status periodically
teetone Dec 1, 2021
9cd5223
Improve download performance && handle state read errors
teetone Dec 3, 2021
dec1766
Merge branch 'master' of https://github.com/codalab/codalab-worksheet…
teetone Dec 3, 2021
e5d8ea7
temp disable test
teetone Dec 3, 2021
f8d4f56
Merge branch 'master' of https://github.com/codalab/codalab-worksheet…
teetone Dec 4, 2021
3bc1f2e
Merge branch 'master' of https://github.com/codalab/codalab-worksheet…
teetone Dec 8, 2021
93c7577
Merge branch 'master' of https://github.com/codalab/codalab-worksheet…
teetone Dec 13, 2021
aaa72a2
Merge branch 'master' of https://github.com/codalab/codalab-worksheet…
teetone Dec 22, 2021
8376d23
Merge branch 'master' of https://github.com/codalab/codalab-worksheet…
teetone Jan 26, 2022
e429edc
log when its own dependency manager is in the middle of downloading a…
teetone Jan 26, 2022
22bd061
Merge branch 'master' of https://github.com/codalab/codalab-worksheet…
teetone Jan 26, 2022
b5ce6d8
debug
teetone Jan 26, 2022
840755c
Allow running multiple workers
epicfaace Jan 26, 2022
22c615d
fix worker2
epicfaace Jan 26, 2022
2df98b1
fix id of second worker
epicfaace Jan 26, 2022
c4cba5f
fix: use separate state files for each worker
epicfaace Jan 26, 2022
03b26b0
don't do debug for now
epicfaace Jan 26, 2022
0090c41
remove flag
epicfaace Jan 26, 2022
43b2737
re-add alembic
epicfaace Jan 26, 2022
224402a
make nfsdependencymanager the default
teetone Jan 26, 2022
953b2de
comment explaining unique id
teetone Jan 26, 2022
aaf03f7
Merge branch 'master' of https://github.com/codalab/codalab-worksheet…
teetone Jan 27, 2022
a96ec74
combine dependency managers
teetone Jan 28, 2022
227e754
Bump flufl.lock version to 7.0
teetone Jan 28, 2022
512b568
Rever back to older version of flufl.lock
teetone Jan 28, 2022
597ecb3
revert back to the old state_committer commit
teetone Jan 28, 2022
bfe62cf
acquire reentrant lock before flufl lock
teetone Feb 2, 2022
3541ddf
resolve merge conflicts
teetone Feb 2, 2022
5bc73ee
Merge branch 'master' of https://github.com/codalab/codalab-worksheet…
teetone Feb 6, 2022
68dba36
cleanup
teetone Feb 14, 2022
9bd39ea
Merge branch 'master' of https://github.com/codalab/codalab-worksheet…
teetone Feb 14, 2022
21000c6
fix test
teetone Feb 14, 2022
bd544ca
Merge branch 'master' of https://github.com/codalab/codalab-worksheet…
teetone Feb 25, 2022
514bb35
cleanup
teetone Feb 25, 2022
4f21a6f
Merge branch 'master' of https://github.com/codalab/codalab-worksheet…
teetone Feb 25, 2022
dddaed2
fix teest
teetone Feb 27, 2022
1a07aab
Merge branch 'master' of https://github.com/codalab/codalab-worksheet…
teetone Feb 27, 2022
fbbecc8
Update codalab/worker/dependency_manager.py
epicfaace Mar 2, 2022
b3be38e
Update codalab/worker/dependency_manager.py
epicfaace Mar 2, 2022
fcc0675
Merge branch 'master' of https://github.com/codalab/codalab-worksheet…
teetone Mar 2, 2022
821de7d
Merge branch 'flufl' of https://github.com/codalab/codalab-worksheets…
teetone Mar 2, 2022
2dbca5b
Merge branch 'master' of https://github.com/codalab/codalab-worksheet…
teetone Mar 28, 2022
d4f02d8
wrap fetch_state calls
teetone Mar 28, 2022
dd3ba54
Merge branch 'master' into flufl
mergify[bot] Apr 4, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
614 changes: 385 additions & 229 deletions codalab/worker/dependency_manager.py

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion codalab/worker/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,9 @@ def main():
worker = Worker(
image_manager,
dependency_manager,
os.path.join(args.work_dir, 'worker-state.json'),
# Include the worker ID in the worker state JSON path, so multiple workers
# sharing the same work directory maintain their own state.
os.path.join(args.work_dir, f'worker-state-{args.id}.json'),
args.cpuset,
args.gpuset,
args.max_memory,
Expand Down
33 changes: 24 additions & 9 deletions codalab/worker/state_committer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import tempfile
import shutil

from . import pyjson


Expand All @@ -20,30 +21,44 @@ def commit(self, state):

class JsonStateCommitter(BaseStateCommitter):
def __init__(self, json_path):
self.temp_file = None
self._state_file = json_path

@property
def path(self):
return self._state_file

@property
def state_file_exists(self) -> bool:
return os.path.isfile(self._state_file)

def load(self, default=None):
teetone marked this conversation as resolved.
Show resolved Hide resolved
"""
Loads and reads from state file. If an error occurs, `default` will be returned, if it exists.
"""
try:
with open(self._state_file) as json_data:
return pyjson.load(json_data)
except (ValueError, EnvironmentError):
return dict() if default is None else default
except (ValueError, EnvironmentError) as e:
if default is not None:
logger.warning(
f"Failed to load state from {self.path} due to {e}. Returning default: {default}.",
exc_info=True,
)
return default
logger.error(f"Failed to load state from {self.path}: {e}", exc_info=True)
raise e

def commit(self, state):
""" Write out the state in JSON format to a temporary file and rename it into place """
with tempfile.NamedTemporaryFile(delete=False) as f:
try:
self.temp_file = f.name
f.write(pyjson.dumps(state).encode())
f.flush()
shutil.copyfile(self.temp_file, self._state_file)
shutil.copyfile(f.name, self._state_file)
finally:
try:
os.unlink(self.temp_file)
os.unlink(f.name)
except FileNotFoundError:
logger.error(
"Problem occurred in deleting temp file {} via os.unlink".format(
self.temp_file
)
"Problem occurred in deleting temp file {} via os.unlink".format(f.name)
)
3 changes: 2 additions & 1 deletion codalab/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ def save_state(self):
self.state_committer.commit(runs)

def load_state(self):
runs = self.state_committer.load()
# If the state file doesn't exist yet, have the state committer return an empty state.
runs = self.state_committer.load(default=dict())
# Retrieve the complex container objects from the Docker API
for uuid, run_state in runs.items():
if run_state.container_id:
Expand Down
56 changes: 36 additions & 20 deletions codalab/worker/worker_run_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import threading
import time
import traceback
from typing import Dict

import codalab.worker.docker_utils as docker_utils

Expand Down Expand Up @@ -251,13 +252,26 @@ def mount_dependency(dependency, shared_file_system):

dependencies_ready = True
status_messages = []
dependency_keys_to_paths: Dict[DependencyKey, str] = dict()
teetone marked this conversation as resolved.
Show resolved Hide resolved

if not self.shared_file_system:
# No need to download dependencies if we're in the shared FS,
# since they're already in our FS
for dep in run_state.bundle.dependencies:
dep_key = DependencyKey(dep.parent_uuid, dep.parent_path)
dependency_state = self.dependency_manager.get(run_state.bundle.uuid, dep_key)

try:
# Fetching dependencies from the Dependency Manager can fail.
# Just update the download status on the next iteration of this transition function.
dependency_state = self.dependency_manager.get(run_state.bundle.uuid, dep_key)
teetone marked this conversation as resolved.
Show resolved Hide resolved
dependency_keys_to_paths[dep_key] = os.path.join(
self.dependency_manager.dependencies_dir, dependency_state.path
)
except Exception:
status_messages.append(f'Downloading dependency {dep.child_path} failed')
dependencies_ready = False
continue

if dependency_state.stage == DependencyStage.DOWNLOADING:
status_messages.append(
'Downloading dependency %s: %s done (archived size)'
Expand Down Expand Up @@ -311,7 +325,7 @@ def mount_dependency(dependency, shared_file_system):
if run_state.bundle_dir_wait_num_tries == 0:
message = (
"Bundle directory cannot be found on the shared filesystem. "
"Please ensure the shared fileystem between the server and "
"Please ensure the shared filesystem between the server and "
"your worker is mounted properly or contact your administrators."
)
log_bundle_transition(
Expand Down Expand Up @@ -344,7 +358,13 @@ def mount_dependency(dependency, shared_file_system):
for dep in run_state.bundle.dependencies:
full_child_path = os.path.normpath(os.path.join(run_state.bundle_path, dep.child_path))
to_mount = []
dependency_path = self._get_dependency_path(run_state, dep)
if self.shared_file_system:
teetone marked this conversation as resolved.
Show resolved Hide resolved
# TODO(Ashwin): make this not fs-specific.
# On a shared FS, we know where the dependency is stored and can get the contents directly
dependency_path = os.path.realpath(os.path.join(dep.location, dep.parent_path))
else:
dep_key = DependencyKey(dep.parent_uuid, dep.parent_path)
dependency_path = dependency_keys_to_paths[dep_key]

if dep.child_path == RunStateMachine._CURRENT_DIRECTORY:
# Mount all the content of the dependency_path to the top-level of the bundle
Expand Down Expand Up @@ -441,19 +461,6 @@ def mount_dependency(dependency, shared_file_system):
gpuset=gpuset,
)

def _get_dependency_path(self, run_state, dependency):
if self.shared_file_system:
# TODO(Ashwin): make this not fs-specific.
# On a shared FS, we know where the dependency is stored and can get the contents directly
return os.path.realpath(os.path.join(dependency.location, dependency.parent_path))
else:
# On a dependency_manager setup, ask the manager where the dependency is
dep_key = DependencyKey(dependency.parent_uuid, dependency.parent_path)
return os.path.join(
self.dependency_manager.dependencies_dir,
self.dependency_manager.get(run_state.bundle.uuid, dep_key).path,
)

def _transition_from_RUNNING(self, run_state):
"""
1- Check run status of the docker container
Expand Down Expand Up @@ -608,10 +615,19 @@ def remove_path_no_fail(path):
logger.error(traceback.format_exc())
time.sleep(1)

for dep in run_state.bundle.dependencies:
if not self.shared_file_system: # No dependencies if shared fs worker
dep_key = DependencyKey(dep.parent_uuid, dep.parent_path)
self.dependency_manager.release(run_state.bundle.uuid, dep_key)
try:
# Fetching dependencies from the Dependency Manager can fail.
# Finish cleaning up on the next iteration of this transition function.
for dep in run_state.bundle.dependencies:
if not self.shared_file_system: # No dependencies if shared fs worker
dep_key = DependencyKey(dep.parent_uuid, dep.parent_path)
self.dependency_manager.release(run_state.bundle.uuid, dep_key)
except (ValueError, EnvironmentError):
# Do nothing if an error is thrown while reading from the state file
logging.exception(
f"Error reading from dependencies state file while releasing a dependency from {run_state.bundle.uuid}"
)
return run_state

# Clean up dependencies paths
for path in run_state.paths_to_remove or []:
Expand Down
2 changes: 1 addition & 1 deletion codalab/worker_manager/slurm_batch_worker_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ def setup_codalab_worker(self, worker_id):
work_dir_prefix = Path()

worker_work_dir = work_dir_prefix.joinpath(
Path('{}-codalab-SlurmBatchWorkerManager-scratch'.format(self.username), worker_id)
Path('{}-codalab-SlurmBatchWorkerManager-scratch'.format(self.username), "workdir")
)
command = self.build_command(worker_id, str(worker_work_dir))

Expand Down
11 changes: 11 additions & 0 deletions docs/Worker-Managers.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,17 @@ We support the following Worker Managers:
| slurm-batch | Worker manager for submitting jobs using Slurm Batch. |
| kubernetes | Worker manager for submitting jobs to a Kubernetes cluster. |

## Setting a shared cache

To use a shared cache among workers, have all the workers use the same working directory by specifying
the same path for `--work-dir`. The working directory is set by `-worker-work-dir-prefix` when starting
a worker manager. The dependency managers can be used over NFS, so a working directory can be on a network disk.
teetone marked this conversation as resolved.
Show resolved Hide resolved

```commandline
cl-worker-manager --worker-work-dir-prefix /juice slurm-batch --cpus 4 --gpus 1 --memory-mb 16000
```

In the example worker manager command above, `juice` is a directory on a network disk.

## AWS Batch Worker Manager

Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ wheel==0.35.1
urllib3==1.26.5
retry==0.9.2
spython==0.1.14
flufl.lock==6.0
147 changes: 147 additions & 0 deletions tests/unit/worker/dependency_manager_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
import os
import time
import unittest
import shutil
import tempfile
from concurrent.futures import ProcessPoolExecutor
from unittest.mock import MagicMock

from codalab.worker.bundle_state import DependencyKey

try:
from codalab.worker.dependency_manager import DependencyManager

module_failed = False
except ImportError:
module_failed = True


class DependencyManagerTest(unittest.TestCase):
def setUp(self):
if module_failed:
self.skipTest('Issue with ratarmountcore.')

self.work_dir = tempfile.mkdtemp()
self.state_path = os.path.join(self.work_dir, "dependencies-state.json")
self.dependency_manager = DependencyManager(
commit_file=self.state_path,
bundle_service=None,
worker_dir=self.work_dir,
max_cache_size_bytes=1024,
download_dependencies_max_retries=1,
)

def tearDown(self):
shutil.rmtree(self.work_dir)

def test_get_has(self):
dependent_uuid = "0x2"
dependency_key = DependencyKey(parent_uuid="0x1", parent_path="parent")
state = self.dependency_manager.get(dependent_uuid, dependency_key)
self.assertTrue(self.dependency_manager.has(dependency_key))
self.assertEqual(state.stage, "DOWNLOADING")
self.assertEqual(state.path, "0x1_parent")
self.assertEqual(state.dependents, {dependent_uuid})

def test_release(self):
dependency_key = DependencyKey(parent_uuid="0x1", parent_path="parent")
self.dependency_manager.get("0x2", dependency_key)
state = self.dependency_manager.get("0x3", dependency_key)
# Passing in the same dependency key with a different dependent, will just add the dependent
self.assertEqual(state.dependents, {"0x2", "0x3"})

# Release 0x2 as a dependent
self.dependency_manager.release("0x2", dependency_key)
with self.dependency_manager._state_lock:
dependencies = self.dependency_manager._fetch_dependencies()
state = dependencies[dependency_key]
self.assertEqual(state.dependents, {"0x3"})

# Release 0x3 as a dependent - should be left with no dependents
self.dependency_manager.release("0x3", dependency_key)
with self.dependency_manager._state_lock:
dependencies = self.dependency_manager._fetch_dependencies()
state = dependencies[dependency_key]
self.assertEqual(len(state.dependents), 0)

def test_all_dependencies(self):
dependency_key = DependencyKey(parent_uuid="0x1", parent_path="parent")
self.dependency_manager.get("0x2", dependency_key)
dependency_key = DependencyKey(parent_uuid="0x3", parent_path="parent2")
self.dependency_manager.get("0x4", dependency_key)
dependency_keys = self.dependency_manager.all_dependencies
self.assertEqual(len(dependency_keys), 2)

@unittest.skip(
"Flufl.lock doesn't seem to work on GHA for some reason, "
"even though this test passes on other machines."
)
def test_concurrency(self):
num_of_dependency_managers = 10
executor = ProcessPoolExecutor(max_workers=num_of_dependency_managers)

random_file_path = os.path.join(self.work_dir, "random_file")
with open(random_file_path, "wb") as f:
f.seek((1024 * 1024 * 1024) - 1) # 1 GB
f.write(b"\0")

futures = [
executor.submit(task, self.work_dir, self.state_path, random_file_path)
for _ in range(num_of_dependency_managers)
]
for future in futures:
print(future.result())
self.assertIsNone(future.exception())
executor.shutdown()


def task(work_dir, state_path, random_file_path):
"""
Runs the end-to-end workflow of the Dependency Manager.
Note: ProcessPoolExecutor must serialize everything before sending it to the worker,
so this function needs to be defined at the top-level.
# """
# Mock Bundle Service to return a random file object
mock_bundle_service = MagicMock()
mock_bundle_service.get_bundle_info = MagicMock(return_value={'type': "file"})
file_obj = open(random_file_path, "rb")
mock_bundle_service.get_bundle_contents = MagicMock(return_value=file_obj)

# Create and start a dependency manager
process_id = os.getpid()
print(f"{process_id}: Starting a DependencyManager...")
dependency_manager = DependencyManager(
commit_file=state_path,
bundle_service=mock_bundle_service,
worker_dir=work_dir,
max_cache_size_bytes=2048,
download_dependencies_max_retries=1,
)
dependency_manager.start()
print(f"{process_id}: Started with work directory: {work_dir}.")

# Register a run's UUID as a dependent of a parent bundle with UUID 0x1
dependency_key = DependencyKey(parent_uuid="0x1", parent_path="parent")
run_uuid = f"0x{process_id}"
state = dependency_manager.get(run_uuid, dependency_key)
assert (
run_uuid in state.dependents
), f"{process_id}: Expected {run_uuid} as one of the dependents."

# Release the run bundle as a dependent
dependency_manager.release(run_uuid, dependency_key)
dependencies = dependency_manager._fetch_dependencies()
if dependency_key in dependencies:
state = dependencies[dependency_key]
print(f"{process_id}: Checking {run_uuid} in {state.dependents}")
assert (
run_uuid not in state.dependents
), f"{process_id}: Dependent should not be in the list of dependents after unregistering."

# Keep the dependency manager running for some time to test the loop
time.sleep(30)

# Stop the Dependency Manager
print(f"{process_id}: Stopping DependencyManger...")
dependency_manager.stop()
print(f"{process_id}: Done.")
8 changes: 6 additions & 2 deletions tests/unit/worker/state_committer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@ def tearDown(self):

def test_path_parsing(self):
""" Simple test to ensure we don't mess up the state file path"""
self.assertEqual(self.committer._state_file, self.state_path)
self.assertEqual(self.committer.path, self.state_path)

def test_state_file_exists(self):
self.assertFalse(self.committer.state_file_exists)
self.committer.commit({'state': 'value'})
self.assertTrue(self.committer.state_file_exists)

def test_commit(self):
"""Make sure state is committed correctly"""
Expand All @@ -30,7 +35,6 @@ def test_commit(self):
self.committer.commit(test_state)
with open(self.state_path) as f:
self.assertEqual(test_state_json_str, f.read())
self.assertFalse(os.path.exists(self.committer.temp_file))

def test_load(self):
""" Make sure load loads the state file if it exists """
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def test_base_command(self):

expected_command_str = (
"cl-worker --server some_server --verbose --exit-when-idle --idle-seconds 888 "
"--work-dir /some/path/some_user-codalab-SlurmBatchWorkerManager-scratch/some_worker_id "
"--work-dir /some/path/some_user-codalab-SlurmBatchWorkerManager-scratch/workdir "
"--id $(hostname -s)-some_worker_id --network-prefix cl_worker_some_worker_id_network --tag some_tag "
"--group some_group --exit-after-num-runs 8 --download-dependencies-max-retries 5 "
"--max-work-dir-size 88g --checkin-frequency-seconds 30 --shared-memory-size-gb 10 "
Expand Down