Skip to content

Commit

Permalink
Experimental: enable automatic GCS flushing with configurable policy. (
Browse files Browse the repository at this point in the history
ray-project#2266)

* build_credis.sh: use an up-to-date credis commit.

* build_credis.sh: leveldb is updated, so update build cmds for it

* WIP: make monitor.py issue flush; switch gcs client to use credis

* Experimental: enable automatic GCS flushing with configurable policy.

* Fix linux compilation error

* Fix leveldb build

* Use optimized build for credis

* Address comments

* Attempt to fix tests
  • Loading branch information
concretevitamin authored and pcmoritz committed Jun 20, 2018
1 parent 60bc3a0 commit 8190ff1
Show file tree
Hide file tree
Showing 9 changed files with 205 additions and 25 deletions.
5 changes: 4 additions & 1 deletion python/ray/experimental/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
flush_redis_unsafe, flush_task_and_object_metadata_unsafe,
flush_finished_tasks_unsafe, flush_evicted_objects_unsafe,
_flush_finished_tasks_unsafe_shard, _flush_evicted_objects_unsafe_shard)
from .gcs_flush_policy import (set_flushing_policy, GcsFlushPolicy,
SimpleGcsFlushPolicy)
from .named_actors import get_actor, register_actor
from .api import get, wait

Expand All @@ -15,5 +17,6 @@
"flush_task_and_object_metadata_unsafe", "flush_finished_tasks_unsafe",
"flush_evicted_objects_unsafe", "_flush_finished_tasks_unsafe_shard",
"_flush_evicted_objects_unsafe_shard", "get_actor", "register_actor",
"get", "wait"
"get", "wait", "set_flushing_policy", "GcsFlushPolicy",
"SimpleGcsFlushPolicy"
]
12 changes: 8 additions & 4 deletions python/ray/experimental/features.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,22 @@
TASK_PREFIX = b"TT:"


def flush_redis_unsafe():
def flush_redis_unsafe(redis_client=None):
"""This removes some non-critical state from the primary Redis shard.
This removes the log files as well as the event log from Redis. This can
be used to try to address out-of-memory errors caused by the accumulation
of metadata in Redis. However, it will only partially address the issue as
much of the data is in the task table (and object table), which are not
flushed.
"""
ray.worker.global_worker.check_connected()
redis_client = ray.worker.global_worker.redis_client
Args:
redis_client: optional, if not provided then ray.init() must have been
called.
"""
if redis_client is None:
ray.worker.global_worker.check_connected()
redis_client = ray.worker.global_worker.redis_client

# Delete the log files from the primary Redis shard.
keys = redis_client.keys("LOGFILE:*")
Expand Down
91 changes: 91 additions & 0 deletions python/ray/experimental/gcs_flush_policy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import os
import time

import ray
import ray.cloudpickle as pickle


class GcsFlushPolicy(object):
"""Experimental: a policy to control GCS flushing.
Used by Monitor to enable automatic control of memory usage.
"""

def should_flush(self, redis_client):
"""Returns a bool, whether a flush request should be issued."""
pass

def num_entries_to_flush(self):
"""Returns an upper bound for number of entries to flush next."""
pass

def record_flush(self):
"""Must be called after a flush has been performed."""
pass


class SimpleGcsFlushPolicy(GcsFlushPolicy):
"""A simple policy with constant flush rate, after a warmup period.
Example policy values:
flush_when_at_least_bytes 2GB
flush_period_secs 10s
flush_num_entries_each_time 10k
This means
(1) If the GCS shard uses less than 2GB of memory, no flushing would take
place. This should cover most Ray runs.
(2) The GCS shard will only honor a flush request, if it's issued after 10
seconds since the last processed flush. In particular this means it's
okay for the Monitor to issue requests more frequently than this param.
(3) When processing a flush, the shard will flush at most 10k entries.
This is to control the latency of each request.
Note, flush rate == (flush period) * (num entries each time). So
applications that have a heavier GCS load can tune these params.
"""

def __init__(self,
flush_when_at_least_bytes=(1 << 31),
flush_period_secs=10,
flush_num_entries_each_time=10000):
self.flush_when_at_least_bytes = flush_when_at_least_bytes
self.flush_period_secs = flush_period_secs
self.flush_num_entries_each_time = flush_num_entries_each_time
self.last_flush_timestamp = time.time()

def should_flush(self, redis_client):
if time.time() - self.last_flush_timestamp < self.flush_period_secs:
return False

used_memory = redis_client.info("memory")["used_memory"]
assert used_memory > 0

return used_memory >= self.flush_when_at_least_bytes

def num_entries_to_flush(self):
return self.flush_num_entries_each_time

def record_flush(self):
self.last_flush_timestamp = time.time()

def serialize(self):
return pickle.dumps(self)


def set_flushing_policy(flushing_policy):
"""Serialize this policy for Monitor to pick up."""
if "RAY_USE_NEW_GCS" not in os.environ:
raise Exception(
"set_flushing_policy() is only available when environment "
"variable RAY_USE_NEW_GCS is present at both compile and run time."
)
ray.worker.global_worker.check_connected()
redis_client = ray.worker.global_worker.redis_client

serialized = pickle.dumps(flushing_policy)
redis_client.set("gcs_flushing_policy", serialized)
49 changes: 49 additions & 0 deletions python/ray/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from collections import Counter, defaultdict

import ray
import ray.cloudpickle as pickle
import ray.utils
import redis
# Import flatbuffer bindings.
Expand Down Expand Up @@ -113,6 +114,19 @@ def __init__(self, redis_address, redis_port, autoscaling_config):
else:
self.autoscaler = None

# Experimental feature: GCS flushing.
self.issue_gcs_flushes = "RAY_USE_NEW_GCS" in os.environ
self.gcs_flush_policy = None
if self.issue_gcs_flushes:
# For now, we take the primary redis server to issue flushes,
# because task table entries are stored there under this flag.
try:
self.redis.execute_command("HEAD.FLUSH 0")
except redis.exceptions.ResponseError as e:
log.info("Turning off flushing due to exception: {}".format(
str(e)))
self.issue_gcs_flushes = False

def subscribe(self, channel):
"""Subscribe to the given channel.
Expand Down Expand Up @@ -534,6 +548,37 @@ def update_local_scheduler_map(self):
or local_scheduler_info["NodeManagerAddress"]).split(":")[0]
self.local_scheduler_id_to_ip_map[client_id] = ip_address

def _maybe_flush_gcs(self):
"""Experimental: issue a flush request to the GCS.
The purpose of this feature is to control GCS memory usage.
To activate this feature, Ray must be compiled with the flag
RAY_USE_NEW_GCS set, and Ray must be started at run time with the flag
as well.
"""
if not self.issue_gcs_flushes:
return
if self.gcs_flush_policy is None:
serialized = self.redis.get("gcs_flushing_policy")
if serialized is None:
# Client has not set any policy; by default flushing is off.
return
self.gcs_flush_policy = pickle.loads(serialized)

if not self.gcs_flush_policy.should_flush(self.redis):
return

max_entries_to_flush = self.gcs_flush_policy.num_entries_to_flush()
num_flushed = self.redis.execute_command(
"HEAD.FLUSH {}".format(max_entries_to_flush))
log.info('num_flushed {}'.format(num_flushed))

# This flushes event log and log files.
ray.experimental.flush_redis_unsafe(self.redis)

self.gcs_flush_policy.record_flush()

def run(self):
"""Run the monitor.
Expand Down Expand Up @@ -578,12 +623,16 @@ def run(self):
if self.autoscaler:
self.autoscaler.update()

self._maybe_flush_gcs()

# Record how many dead local schedulers and plasma managers we had
# at the beginning of this round.
num_dead_local_schedulers = len(self.dead_local_schedulers)
num_dead_plasma_managers = len(self.dead_plasma_managers)

# Process a round of messages.
self.process_messages()

# If any new local schedulers or plasma managers were marked as
# dead in this round, clean up the associated state.
if len(self.dead_local_schedulers) > num_dead_local_schedulers:
Expand Down
26 changes: 17 additions & 9 deletions python/ray/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,10 @@ def start_redis(node_ip_address,
# It is important to load the credis module BEFORE the ray module,
# as the latter contains an extern declaration that the former
# supplies.
modules=[CREDIS_MASTER_MODULE, REDIS_MODULE])
# NOTE: once data entries are all put under the redis shard(s)
# instead of the primary server when RAY_USE_NEW_GCS is set, we
# should load CREDIS_MASTER_MODULE here.
modules=[CREDIS_MEMBER_MODULE, REDIS_MODULE])
if port is not None:
assert assigned_port == port
port = assigned_port
Expand Down Expand Up @@ -523,7 +526,10 @@ def start_redis(node_ip_address,
# It is important to load the credis module BEFORE the ray
# module, as the latter contains an extern declaration that the
# former supplies.
modules=[CREDIS_MEMBER_MODULE, REDIS_MODULE])
# NOTE: once data entries are all put under the redis shard(s)
# instead of the primary server when RAY_USE_NEW_GCS is set, we
# should load CREDIS_MEMBER_MODULE here.
modules=[CREDIS_MASTER_MODULE, REDIS_MODULE])

if redis_shard_ports[i] is not None:
assert redis_shard_port == redis_shard_ports[i]
Expand All @@ -533,13 +539,15 @@ def start_redis(node_ip_address,
primary_redis_client.rpush("RedisShards", shard_address)

if use_credis:
# Configure the chain state.
primary_redis_client.execute_command("MASTER.ADD", node_ip_address,
redis_shard_port)
shard_client = redis.StrictRedis(
host=node_ip_address, port=redis_shard_port)
shard_client.execute_command("MEMBER.CONNECT_TO_MASTER",
node_ip_address, port)
# Configure the chain state.
# NOTE: once data entries are all put under the redis shard(s) instead
# of the primary server when RAY_USE_NEW_GCS is set, we should swap the
# callers here.
shard_client.execute_command("MASTER.ADD", node_ip_address, port)
primary_redis_client.execute_command("MEMBER.CONNECT_TO_MASTER",
node_ip_address, redis_shard_port)

return redis_address, redis_shards

Expand Down Expand Up @@ -615,8 +623,8 @@ def _start_redis_instance(node_ip_address="127.0.0.1",
port = new_port()
counter += 1
if counter == num_retries:
raise Exception(
"Couldn't start Redis. Check stdout file {}".format(stdout_file))
raise Exception("Couldn't start Redis. Check log files: {} {}".format(
stdout_file.name, stderr_file.name))

# Create a Redis client just for configuring Redis.
redis_client = redis.StrictRedis(host="127.0.0.1", port=port)
Expand Down
1 change: 1 addition & 0 deletions src/common/redis_module/ray_redis_module.cc
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,7 @@ int TableAdd_RedisCommand(RedisModuleCtx *ctx,
int ChainTableAdd_RedisCommand(RedisModuleCtx *ctx,
RedisModuleString **argv,
int argc) {
RedisModule_AutoMemory(ctx);
return module.ChainReplicate(ctx, argv, argc, /*node_func=*/TableAdd_DoWrite,
/*tail_func=*/TableAdd_DoPublish);
}
Expand Down
8 changes: 7 additions & 1 deletion src/ray/gcs/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,15 @@ AsyncGcsClient::AsyncGcsClient(const ClientID &client_id, CommandType command_ty
command_type_ = command_type;
}

#if RAY_USE_NEW_GCS
// Use of kChain currently only applies to Table::Add which affects only the
// task table, and when RAY_USE_NEW_GCS is set at compile time.
AsyncGcsClient::AsyncGcsClient(const ClientID &client_id)
: AsyncGcsClient(client_id, CommandType::kChain) {}
#else
AsyncGcsClient::AsyncGcsClient(const ClientID &client_id)
: AsyncGcsClient(client_id, CommandType::kRegular) {}
#endif // RAY_USE_NEW_GCS

AsyncGcsClient::AsyncGcsClient(CommandType command_type)
: AsyncGcsClient(ClientID::from_random(), command_type) {}
Expand Down Expand Up @@ -67,7 +74,6 @@ FunctionTable &AsyncGcsClient::function_table() { return *function_table_; }
ClassTable &AsyncGcsClient::class_table() { return *class_table_; }

HeartbeatTable &AsyncGcsClient::heartbeat_table() { return *heartbeat_table_; }

} // namespace gcs

} // namespace ray
14 changes: 11 additions & 3 deletions test/credis_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,23 @@ def test_credis_started(self):
assert "redis_address" in self.config
primary = parse_client(self.config['redis_address'])
assert primary.ping() is True
member = primary.lrange('RedisShards', 0, -1)[0]
shard = parse_client(member.decode())

# TODO(zongheng): remove these next four lines of horror, once task
# table is correctly placed in the data shard & swapping master and
# member modules.
member = self.config['redis_address']
temp = primary
primary = shard
shard = temp

# Check that primary has loaded credis' master module.
chain = primary.execute_command('MASTER.GET_CHAIN')
assert len(chain) == 1

# Check that the shard has loaded credis' member module.
member = primary.lrange('RedisShards', 0, -1)[0]
assert chain[0] == member
shard = parse_client(member.decode())
assert chain[0].decode() == member
assert shard.execute_command('MEMBER.SN') == -1


Expand Down
24 changes: 17 additions & 7 deletions thirdparty/scripts/build_credis.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,22 @@ set -e
TP_DIR=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd)/../
ROOT_DIR=$TP_DIR/..

# For some reason, on Ubuntu/gcc toolchain linking against static libleveldb.a
# doesn't work, so we force building the shared library for non-Mac.
if [ "$(uname)" == "Darwin" ]; then
BUILD_LEVELDB_CONFIG=""
else
BUILD_LEVELDB_CONFIG="-DBUILD_SHARED_LIBS=on"
fi

if [[ "${RAY_USE_NEW_GCS}" = "on" ]]; then
pushd "$TP_DIR/pkg/"
rm -rf credis
git clone --recursive https://github.com/ray-project/credis
popd

pushd "$TP_DIR/pkg/credis"
# 4/10/2018 credis/integrate branch. With updated redis hacks.
git checkout cbe8ade35d2278b1d94684fa5d00010cb015ef82
git checkout 273d667e5126c246b45f5dcf030b651a653136c3

# If the above commit points to different submodules' commits than
# origin's head, this updates the submodules.
Expand All @@ -43,18 +50,21 @@ if [[ "${RAY_USE_NEW_GCS}" = "on" ]]; then
else
pushd redis && make -j MALLOC=jemalloc && popd
fi
pushd glog && cmake -DWITH_GFLAGS=off . && make -j && popd
# NOTE(zongheng): DO NOT USE -j parallel build for leveldb as it's incorrect!
pushd leveldb && CXXFLAGS="$CXXFLAGS -fPIC" make && popd
pushd glog; cmake -DWITH_GFLAGS=off . && make -j; popd
pushd leveldb;
mkdir -p build && cd build
cmake ${BUILD_LEVELDB_CONFIG} -DCMAKE_BUILD_TYPE=Release .. && cmake --build .
popd

mkdir build
mkdir -p build
pushd build
cmake ..
cmake -DCMAKE_BUILD_TYPE=Release ..
make -j
popd

mkdir -p $ROOT_DIR/build/src/credis/redis/src/
cp redis/src/redis-server $ROOT_DIR/build/src/credis/redis/src/redis-server

mkdir -p $ROOT_DIR/build/src/credis/build/src/
cp build/src/libmaster.so $ROOT_DIR/build/src/credis/build/src/libmaster.so
cp build/src/libmember.so $ROOT_DIR/build/src/credis/build/src/libmember.so
Expand Down

0 comments on commit 8190ff1

Please sign in to comment.