Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce file_mounts_sync_continuously cluster option #9544

Merged
merged 13 commits into from
Jul 28, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Refactor so that runtime_hash retains previous behavior
runtime_hash is almost identical as before this PR. It is used to determine if setup_commands need to run
file_mounts_contents_hash is an additional hash of the file_mounts content that is used to detect when only file syncing has to occur.

Note: runtime_hash value will have changed from before the PR because we hash the hash of the contents of the file_mounts as a performance optimization
  • Loading branch information
alanwguo committed Jul 22, 2020
commit 094262c4b9ac514e0c2cec866801a4cb62981236
9 changes: 5 additions & 4 deletions python/ray/autoscaler/autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,8 @@ def reload_config(self, errors_fatal=False):
new_config["worker_setup_commands"],
new_config["worker_start_ray_commands"],
],
use_cached_contents_hash=not file_mounts_sync_continuously,
generate_file_mounts_contents_hash=
file_mounts_sync_continuously,
)
self.config = new_config
self.launch_hash = new_launch_hash
Expand Down Expand Up @@ -328,8 +329,9 @@ def files_up_to_date(self, node_id):
applied_file_mounts_contents_hash = node_tags.get(
TAG_RAY_FILE_MOUNTS_CONTENTS)
if (applied_config_hash != self.runtime_hash
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason we need to check the inner hash and not just the runtime one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So when the contents of the head node's file_mounts change, the files will be synced to the worker nodes.

This behavior should be the same as before since previously, the contents hash were included with the runtime hash.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One behaviour I'm concerned with is whether "ray up" will still work if continuous_sync mode is on. If I understand correctly, that won't update the cluster if only files have changed right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah I see, in that case the rsync will still happen but the setup commands wont run. I'll make a change so that it only doesn't run setup commands when update is called in the monitor.py flow.

or applied_file_mounts_contents_hash !=
self.file_mounts_contents_hash):
or (self.file_mounts_contents_hash is not None
and self.file_mounts_contents_hash !=
applied_file_mounts_contents_hash)):
logger.info("StandardAutoscaler: "
"{}: Runtime state is ({},{}), want ({},{})".format(
node_id, applied_config_hash,
Expand Down Expand Up @@ -405,7 +407,6 @@ def spawn_updater(self, node_id, init_commands, ray_start_commands):
ray_start_commands=with_head_node_ip(ray_start_commands),
runtime_hash=self.runtime_hash,
file_mounts_contents_hash=self.file_mounts_contents_hash,
run_setup_commands_on_file_mounts_change=False,
process_runner=self.process_runner,
use_internal_ip=True,
docker_config=self.config.get("docker"))
Expand Down
30 changes: 10 additions & 20 deletions python/ray/autoscaler/updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ def __init__(self,
ray_start_commands,
runtime_hash,
file_mounts_contents_hash,
run_setup_commands_on_file_mounts_change=True,
process_runner=subprocess,
use_internal_ip=False,
docker_config=None):
Expand All @@ -58,10 +57,6 @@ def __init__(self,
self.ray_start_commands = ray_start_commands
self.runtime_hash = runtime_hash
self.file_mounts_contents_hash = file_mounts_contents_hash
# Whether setup commands should be run when file_mounts contents
# have changed but otherwise the cluster config has not changed.
self.run_setup_commands_on_file_mounts_change = \
run_setup_commands_on_file_mounts_change
self.auth_config = auth_config

def run(self):
Expand Down Expand Up @@ -142,15 +137,12 @@ def do_update(self):

node_tags = self.provider.node_tags(self.node_id)
logger.debug("Node tags: {}".format(str(node_tags)))
# file_mounts_content_hash will only change whenever the user restarts
# or updates their cluster with `get_or_create_head_node` unless
# continuous mode is on. In that case, the file_mounts_content_hash
# will be recalculated at a regular interval while the cluster is
# running.
if node_tags.get(
TAG_RAY_RUNTIME_CONFIG) == self.runtime_hash and node_tags.get(
TAG_RAY_FILE_MOUNTS_CONTENTS
) == self.file_mounts_contents_hash:
# runtime_hash will only change whenever the user restarts
# or updates their cluster with `get_or_create_head_node`
if node_tags.get(TAG_RAY_RUNTIME_CONFIG) == self.runtime_hash and (
self.file_mounts_contents_hash is None
or node_tags.get(TAG_RAY_FILE_MOUNTS_CONTENTS) ==
self.file_mounts_contents_hash):
logger.info(self.log_prefix +
"{} already up-to-date, skip to ray start".format(
self.node_id))
Expand All @@ -159,12 +151,10 @@ def do_update(self):
self.node_id, {TAG_RAY_NODE_STATUS: STATUS_SYNCING_FILES})
self.sync_file_mounts(self.rsync_up)

# Skip setup commands when only file_mounts contents change unless
# run_setup_commands_on_file_mounts_change is True.
if node_tags.get(TAG_RAY_RUNTIME_CONFIG) != self.runtime_hash or (
node_tags.get(TAG_RAY_FILE_MOUNTS_CONTENTS) !=
self.file_mounts_contents_hash
and self.run_setup_commands_on_file_mounts_change):
# Only run setup commands if runtime_hash has changed because
# we don't want to run setup_commands every time the head node
# file_mounts folders have changed.
if node_tags.get(TAG_RAY_RUNTIME_CONFIG) != self.runtime_hash:
# Run init commands
self.provider.set_node_tags(
self.node_id, {TAG_RAY_NODE_STATUS: STATUS_SETTING_UP})
Expand Down
36 changes: 28 additions & 8 deletions python/ray/autoscaler/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,20 @@ def hash_launch_conf(node_conf, auth):
_hash_cache = {}


def hash_runtime_conf(file_mounts, extra_objs, use_cached_contents_hash=True):
config_hasher = hashlib.sha1()
def hash_runtime_conf(file_mounts,
extra_objs,
generate_file_mounts_contents_hash=False):
"""Returns two hashes, a runtime hash and file_mounts_content hash.

The runtime hash is used to determine if the configuration or file_mounts
contents have changed. It is used at launch time (ray up) to determine if
a restart is needed.

The file_mounts_content hash is used to determine if the file_mounts
contents have changed. It is used at monitor time to determine if
additional file syncing is needed.
"""
runtime_hasher = hashlib.sha1()
contents_hasher = hashlib.sha1()

def add_content_hashes(path):
Expand All @@ -128,12 +140,20 @@ def add_hash_of_file(fpath):
conf_str = (json.dumps(file_mounts, sort_keys=True).encode("utf-8") +
json.dumps(extra_objs, sort_keys=True).encode("utf-8"))

# Only hash the files once, unless use_cached_contents_hash is false.
if not use_cached_contents_hash or conf_str not in _hash_cache:
config_hasher.update(conf_str)
# Only generate a contents hash if generate_contents_hash is true or
# if we need to generate the runtime_hash
if conf_str not in _hash_cache or generate_file_mounts_contents_hash:
for local_path in sorted(file_mounts.values()):
add_content_hashes(local_path)
_hash_cache[conf_str] = (config_hasher.hexdigest(),
contents_hasher.hexdigest())
contents_hash = contents_hasher.hexdigest()

return _hash_cache[conf_str]
# Generate a new runtime_hash if its not cached
if conf_str not in _hash_cache:
runtime_hasher.update(conf_str)
runtime_hasher.update(contents_hash)
_hash_cache[conf_str] = runtime_hasher.hexdigest()

else:
contents_hash = None

return (_hash_cache[conf_str], contents_hash)
27 changes: 27 additions & 0 deletions python/ray/tests/test_autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1112,6 +1112,7 @@ def testContinuousFileMounts(self):
2, tag_filters={TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE})

for i in [0, 1]:
runner.assert_has_call("172.0.0.{}".format(i), "setup_cmd")
runner.assert_has_call(
"172.0.0.{}".format(i),
"{}/ ubuntu@172.0.0.{}:/home/test-folder/".format(
Expand All @@ -1130,6 +1131,7 @@ def testContinuousFileMounts(self):
2, tag_filters={TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE})

for i in [0, 1]:
runner.assert_not_has_call("172.0.0.{}".format(i), "setup_cmd")
runner.assert_has_call(
"172.0.0.{}".format(i),
"{}/ ubuntu@172.0.0.{}:/home/test-folder/".format(
Expand Down Expand Up @@ -1159,6 +1161,7 @@ def testFileMountsNonContinuous(self):
2, tag_filters={TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE})

for i in [0, 1]:
runner.assert_has_call("172.0.0.{}".format(i), "setup_cmd")
runner.assert_has_call(
"172.0.0.{}".format(i),
"{}/ ubuntu@172.0.0.{}:/home/test-folder/".format(
Expand All @@ -1176,11 +1179,35 @@ def testFileMountsNonContinuous(self):
2, tag_filters={TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE})

for i in [0, 1]:
runner.assert_not_has_call("172.0.0.{}".format(i), "setup_cmd")
runner.assert_not_has_call(
"172.0.0.{}".format(i),
"{}/ ubuntu@172.0.0.{}:/home/test-folder/".format(
file_mount_dir, i))

# Simulate a second `ray up` call
autoscaler = StandardAutoscaler(
config_path,
lm,
max_failures=0,
process_runner=runner,
update_interval_s=0)

autoscaler.update()
self.waitForNodes(2)
self.provider.finish_starting_nodes()
autoscaler.update()
self.waitForNodes(
2, tag_filters={TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE})

for i in [0, 1]:
runner.assert_has_call("172.0.0.{}".format(i), "setup_cmd")
runner.assert_has_call(
"172.0.0.{}".format(i),
"{}/ ubuntu@172.0.0.{}:/home/test-folder/".format(
file_mount_dir, i))



if __name__ == "__main__":
import sys
Expand Down