diff --git a/python/ray/experimental/__init__.py b/python/ray/experimental/__init__.py index a10cc6ed35ebf..31aea175848c6 100644 --- a/python/ray/experimental/__init__.py +++ b/python/ray/experimental/__init__.py @@ -7,6 +7,8 @@ flush_redis_unsafe, flush_task_and_object_metadata_unsafe, flush_finished_tasks_unsafe, flush_evicted_objects_unsafe, _flush_finished_tasks_unsafe_shard, _flush_evicted_objects_unsafe_shard) +from .gcs_flush_policy import (set_flushing_policy, GcsFlushPolicy, + SimpleGcsFlushPolicy) from .named_actors import get_actor, register_actor from .api import get, wait @@ -15,5 +17,6 @@ "flush_task_and_object_metadata_unsafe", "flush_finished_tasks_unsafe", "flush_evicted_objects_unsafe", "_flush_finished_tasks_unsafe_shard", "_flush_evicted_objects_unsafe_shard", "get_actor", "register_actor", - "get", "wait" + "get", "wait", "set_flushing_policy", "GcsFlushPolicy", + "SimpleGcsFlushPolicy" ] diff --git a/python/ray/experimental/features.py b/python/ray/experimental/features.py index 304b275baebf0..ad3ebca775607 100644 --- a/python/ray/experimental/features.py +++ b/python/ray/experimental/features.py @@ -10,7 +10,7 @@ TASK_PREFIX = b"TT:" -def flush_redis_unsafe(): +def flush_redis_unsafe(redis_client=None): """This removes some non-critical state from the primary Redis shard. This removes the log files as well as the event log from Redis. This can @@ -18,10 +18,14 @@ def flush_redis_unsafe(): of metadata in Redis. However, it will only partially address the issue as much of the data is in the task table (and object table), which are not flushed. - """ - ray.worker.global_worker.check_connected() - redis_client = ray.worker.global_worker.redis_client + Args: + redis_client: optional, if not provided then ray.init() must have been + called. + """ + if redis_client is None: + ray.worker.global_worker.check_connected() + redis_client = ray.worker.global_worker.redis_client # Delete the log files from the primary Redis shard. keys = redis_client.keys("LOGFILE:*") diff --git a/python/ray/experimental/gcs_flush_policy.py b/python/ray/experimental/gcs_flush_policy.py new file mode 100644 index 0000000000000..892c4c1110da9 --- /dev/null +++ b/python/ray/experimental/gcs_flush_policy.py @@ -0,0 +1,91 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import os +import time + +import ray +import ray.cloudpickle as pickle + + +class GcsFlushPolicy(object): + """Experimental: a policy to control GCS flushing. + + Used by Monitor to enable automatic control of memory usage. + """ + + def should_flush(self, redis_client): + """Returns a bool, whether a flush request should be issued.""" + pass + + def num_entries_to_flush(self): + """Returns an upper bound for number of entries to flush next.""" + pass + + def record_flush(self): + """Must be called after a flush has been performed.""" + pass + + +class SimpleGcsFlushPolicy(GcsFlushPolicy): + """A simple policy with constant flush rate, after a warmup period. + + Example policy values: + flush_when_at_least_bytes 2GB + flush_period_secs 10s + flush_num_entries_each_time 10k + + This means + (1) If the GCS shard uses less than 2GB of memory, no flushing would take + place. This should cover most Ray runs. + (2) The GCS shard will only honor a flush request, if it's issued after 10 + seconds since the last processed flush. In particular this means it's + okay for the Monitor to issue requests more frequently than this param. + (3) When processing a flush, the shard will flush at most 10k entries. + This is to control the latency of each request. + + Note, flush rate == (flush period) * (num entries each time). So + applications that have a heavier GCS load can tune these params. + """ + + def __init__(self, + flush_when_at_least_bytes=(1 << 31), + flush_period_secs=10, + flush_num_entries_each_time=10000): + self.flush_when_at_least_bytes = flush_when_at_least_bytes + self.flush_period_secs = flush_period_secs + self.flush_num_entries_each_time = flush_num_entries_each_time + self.last_flush_timestamp = time.time() + + def should_flush(self, redis_client): + if time.time() - self.last_flush_timestamp < self.flush_period_secs: + return False + + used_memory = redis_client.info("memory")["used_memory"] + assert used_memory > 0 + + return used_memory >= self.flush_when_at_least_bytes + + def num_entries_to_flush(self): + return self.flush_num_entries_each_time + + def record_flush(self): + self.last_flush_timestamp = time.time() + + def serialize(self): + return pickle.dumps(self) + + +def set_flushing_policy(flushing_policy): + """Serialize this policy for Monitor to pick up.""" + if "RAY_USE_NEW_GCS" not in os.environ: + raise Exception( + "set_flushing_policy() is only available when environment " + "variable RAY_USE_NEW_GCS is present at both compile and run time." + ) + ray.worker.global_worker.check_connected() + redis_client = ray.worker.global_worker.redis_client + + serialized = pickle.dumps(flushing_policy) + redis_client.set("gcs_flushing_policy", serialized) diff --git a/python/ray/monitor.py b/python/ray/monitor.py index 1ab2ca990a3ba..13729d6c73160 100644 --- a/python/ray/monitor.py +++ b/python/ray/monitor.py @@ -10,6 +10,7 @@ from collections import Counter, defaultdict import ray +import ray.cloudpickle as pickle import ray.utils import redis # Import flatbuffer bindings. @@ -113,6 +114,19 @@ def __init__(self, redis_address, redis_port, autoscaling_config): else: self.autoscaler = None + # Experimental feature: GCS flushing. + self.issue_gcs_flushes = "RAY_USE_NEW_GCS" in os.environ + self.gcs_flush_policy = None + if self.issue_gcs_flushes: + # For now, we take the primary redis server to issue flushes, + # because task table entries are stored there under this flag. + try: + self.redis.execute_command("HEAD.FLUSH 0") + except redis.exceptions.ResponseError as e: + log.info("Turning off flushing due to exception: {}".format( + str(e))) + self.issue_gcs_flushes = False + def subscribe(self, channel): """Subscribe to the given channel. @@ -534,6 +548,37 @@ def update_local_scheduler_map(self): or local_scheduler_info["NodeManagerAddress"]).split(":")[0] self.local_scheduler_id_to_ip_map[client_id] = ip_address + def _maybe_flush_gcs(self): + """Experimental: issue a flush request to the GCS. + + The purpose of this feature is to control GCS memory usage. + + To activate this feature, Ray must be compiled with the flag + RAY_USE_NEW_GCS set, and Ray must be started at run time with the flag + as well. + """ + if not self.issue_gcs_flushes: + return + if self.gcs_flush_policy is None: + serialized = self.redis.get("gcs_flushing_policy") + if serialized is None: + # Client has not set any policy; by default flushing is off. + return + self.gcs_flush_policy = pickle.loads(serialized) + + if not self.gcs_flush_policy.should_flush(self.redis): + return + + max_entries_to_flush = self.gcs_flush_policy.num_entries_to_flush() + num_flushed = self.redis.execute_command( + "HEAD.FLUSH {}".format(max_entries_to_flush)) + log.info('num_flushed {}'.format(num_flushed)) + + # This flushes event log and log files. + ray.experimental.flush_redis_unsafe(self.redis) + + self.gcs_flush_policy.record_flush() + def run(self): """Run the monitor. @@ -578,12 +623,16 @@ def run(self): if self.autoscaler: self.autoscaler.update() + self._maybe_flush_gcs() + # Record how many dead local schedulers and plasma managers we had # at the beginning of this round. num_dead_local_schedulers = len(self.dead_local_schedulers) num_dead_plasma_managers = len(self.dead_plasma_managers) + # Process a round of messages. self.process_messages() + # If any new local schedulers or plasma managers were marked as # dead in this round, clean up the associated state. if len(self.dead_local_schedulers) > num_dead_local_schedulers: diff --git a/python/ray/services.py b/python/ray/services.py index 559b89142f6d6..0fd1280c2d703 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -470,7 +470,10 @@ def start_redis(node_ip_address, # It is important to load the credis module BEFORE the ray module, # as the latter contains an extern declaration that the former # supplies. - modules=[CREDIS_MASTER_MODULE, REDIS_MODULE]) + # NOTE: once data entries are all put under the redis shard(s) + # instead of the primary server when RAY_USE_NEW_GCS is set, we + # should load CREDIS_MASTER_MODULE here. + modules=[CREDIS_MEMBER_MODULE, REDIS_MODULE]) if port is not None: assert assigned_port == port port = assigned_port @@ -523,7 +526,10 @@ def start_redis(node_ip_address, # It is important to load the credis module BEFORE the ray # module, as the latter contains an extern declaration that the # former supplies. - modules=[CREDIS_MEMBER_MODULE, REDIS_MODULE]) + # NOTE: once data entries are all put under the redis shard(s) + # instead of the primary server when RAY_USE_NEW_GCS is set, we + # should load CREDIS_MEMBER_MODULE here. + modules=[CREDIS_MASTER_MODULE, REDIS_MODULE]) if redis_shard_ports[i] is not None: assert redis_shard_port == redis_shard_ports[i] @@ -533,13 +539,15 @@ def start_redis(node_ip_address, primary_redis_client.rpush("RedisShards", shard_address) if use_credis: - # Configure the chain state. - primary_redis_client.execute_command("MASTER.ADD", node_ip_address, - redis_shard_port) shard_client = redis.StrictRedis( host=node_ip_address, port=redis_shard_port) - shard_client.execute_command("MEMBER.CONNECT_TO_MASTER", - node_ip_address, port) + # Configure the chain state. + # NOTE: once data entries are all put under the redis shard(s) instead + # of the primary server when RAY_USE_NEW_GCS is set, we should swap the + # callers here. + shard_client.execute_command("MASTER.ADD", node_ip_address, port) + primary_redis_client.execute_command("MEMBER.CONNECT_TO_MASTER", + node_ip_address, redis_shard_port) return redis_address, redis_shards @@ -615,8 +623,8 @@ def _start_redis_instance(node_ip_address="127.0.0.1", port = new_port() counter += 1 if counter == num_retries: - raise Exception( - "Couldn't start Redis. Check stdout file {}".format(stdout_file)) + raise Exception("Couldn't start Redis. Check log files: {} {}".format( + stdout_file.name, stderr_file.name)) # Create a Redis client just for configuring Redis. redis_client = redis.StrictRedis(host="127.0.0.1", port=port) diff --git a/src/common/redis_module/ray_redis_module.cc b/src/common/redis_module/ray_redis_module.cc index 7ac9f7cbe38df..7bcd0c60d9f23 100644 --- a/src/common/redis_module/ray_redis_module.cc +++ b/src/common/redis_module/ray_redis_module.cc @@ -656,6 +656,7 @@ int TableAdd_RedisCommand(RedisModuleCtx *ctx, int ChainTableAdd_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + RedisModule_AutoMemory(ctx); return module.ChainReplicate(ctx, argv, argc, /*node_func=*/TableAdd_DoWrite, /*tail_func=*/TableAdd_DoPublish); } diff --git a/src/ray/gcs/client.cc b/src/ray/gcs/client.cc index 2a23764dd5b23..8e7fef935fb8f 100644 --- a/src/ray/gcs/client.cc +++ b/src/ray/gcs/client.cc @@ -18,8 +18,15 @@ AsyncGcsClient::AsyncGcsClient(const ClientID &client_id, CommandType command_ty command_type_ = command_type; } +#if RAY_USE_NEW_GCS +// Use of kChain currently only applies to Table::Add which affects only the +// task table, and when RAY_USE_NEW_GCS is set at compile time. +AsyncGcsClient::AsyncGcsClient(const ClientID &client_id) + : AsyncGcsClient(client_id, CommandType::kChain) {} +#else AsyncGcsClient::AsyncGcsClient(const ClientID &client_id) : AsyncGcsClient(client_id, CommandType::kRegular) {} +#endif // RAY_USE_NEW_GCS AsyncGcsClient::AsyncGcsClient(CommandType command_type) : AsyncGcsClient(ClientID::from_random(), command_type) {} @@ -67,7 +74,6 @@ FunctionTable &AsyncGcsClient::function_table() { return *function_table_; } ClassTable &AsyncGcsClient::class_table() { return *class_table_; } HeartbeatTable &AsyncGcsClient::heartbeat_table() { return *heartbeat_table_; } - } // namespace gcs } // namespace ray diff --git a/test/credis_test.py b/test/credis_test.py index 869fbf833c405..9e6d58cbfa11b 100644 --- a/test/credis_test.py +++ b/test/credis_test.py @@ -26,15 +26,23 @@ def test_credis_started(self): assert "redis_address" in self.config primary = parse_client(self.config['redis_address']) assert primary.ping() is True + member = primary.lrange('RedisShards', 0, -1)[0] + shard = parse_client(member.decode()) + + # TODO(zongheng): remove these next four lines of horror, once task + # table is correctly placed in the data shard & swapping master and + # member modules. + member = self.config['redis_address'] + temp = primary + primary = shard + shard = temp # Check that primary has loaded credis' master module. chain = primary.execute_command('MASTER.GET_CHAIN') assert len(chain) == 1 # Check that the shard has loaded credis' member module. - member = primary.lrange('RedisShards', 0, -1)[0] - assert chain[0] == member - shard = parse_client(member.decode()) + assert chain[0].decode() == member assert shard.execute_command('MEMBER.SN') == -1 diff --git a/thirdparty/scripts/build_credis.sh b/thirdparty/scripts/build_credis.sh index 1c05097bf8e6f..dc394fb116811 100644 --- a/thirdparty/scripts/build_credis.sh +++ b/thirdparty/scripts/build_credis.sh @@ -18,6 +18,14 @@ set -e TP_DIR=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd)/../ ROOT_DIR=$TP_DIR/.. +# For some reason, on Ubuntu/gcc toolchain linking against static libleveldb.a +# doesn't work, so we force building the shared library for non-Mac. +if [ "$(uname)" == "Darwin" ]; then + BUILD_LEVELDB_CONFIG="" +else + BUILD_LEVELDB_CONFIG="-DBUILD_SHARED_LIBS=on" +fi + if [[ "${RAY_USE_NEW_GCS}" = "on" ]]; then pushd "$TP_DIR/pkg/" rm -rf credis @@ -25,8 +33,7 @@ if [[ "${RAY_USE_NEW_GCS}" = "on" ]]; then popd pushd "$TP_DIR/pkg/credis" - # 4/10/2018 credis/integrate branch. With updated redis hacks. - git checkout cbe8ade35d2278b1d94684fa5d00010cb015ef82 + git checkout 273d667e5126c246b45f5dcf030b651a653136c3 # If the above commit points to different submodules' commits than # origin's head, this updates the submodules. @@ -43,18 +50,21 @@ if [[ "${RAY_USE_NEW_GCS}" = "on" ]]; then else pushd redis && make -j MALLOC=jemalloc && popd fi - pushd glog && cmake -DWITH_GFLAGS=off . && make -j && popd - # NOTE(zongheng): DO NOT USE -j parallel build for leveldb as it's incorrect! - pushd leveldb && CXXFLAGS="$CXXFLAGS -fPIC" make && popd + pushd glog; cmake -DWITH_GFLAGS=off . && make -j; popd + pushd leveldb; + mkdir -p build && cd build + cmake ${BUILD_LEVELDB_CONFIG} -DCMAKE_BUILD_TYPE=Release .. && cmake --build . + popd - mkdir build + mkdir -p build pushd build - cmake .. + cmake -DCMAKE_BUILD_TYPE=Release .. make -j popd mkdir -p $ROOT_DIR/build/src/credis/redis/src/ cp redis/src/redis-server $ROOT_DIR/build/src/credis/redis/src/redis-server + mkdir -p $ROOT_DIR/build/src/credis/build/src/ cp build/src/libmaster.so $ROOT_DIR/build/src/credis/build/src/libmaster.so cp build/src/libmember.so $ROOT_DIR/build/src/credis/build/src/libmember.so