Skip to content

Fix LRU eviction of client notification datastructure #4021

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Feb 14, 2019
Merged
92 changes: 56 additions & 36 deletions src/ray/gcs/redis_module/ray_redis_module.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ extern RedisChainModule module;
} \
}

/// Map from pub sub channel to clients that are waiting on that channel.
std::unordered_map<std::string, std::vector<std::string>> notification_map;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add the size of this to the debug string somehow?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could alternatively print a warning if this map exceeds certain size bounds, e.g. 1000, 10k, 100k.

Copy link
Contributor Author

@pcmoritz pcmoritz Feb 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, I added a redis command that can query the DEBUG_STRING


/// Parse a Redis string into a TablePubsub channel.
Status ParseTablePubsub(TablePubsub *out, const RedisModuleString *pubsub_channel_str) {
long long pubsub_channel_long;
Expand Down Expand Up @@ -117,14 +120,12 @@ Status OpenPrefixedKey(RedisModuleKey **out, RedisModuleCtx *ctx,

/// Open the key used to store the channels that should be published to when an
/// update happens at the given keyname.
Status OpenBroadcastKey(RedisModuleKey **out, RedisModuleCtx *ctx,
RedisModuleString *pubsub_channel_str, RedisModuleString *keyname,
int mode) {
Status GetBroadcastKey(RedisModuleCtx *ctx, RedisModuleString *pubsub_channel_str,
RedisModuleString *keyname, std::string *out) {
RedisModuleString *channel;
RAY_RETURN_NOT_OK(FormatPubsubChannel(&channel, ctx, pubsub_channel_str, keyname));
RedisModuleString *prefixed_keyname = RedisString_Format(ctx, "BCAST:%S", channel);
*out = reinterpret_cast<RedisModuleKey *>(
RedisModule_OpenKey(ctx, prefixed_keyname, mode));
*out = RedisString_ToString(prefixed_keyname);
return Status::OK();
}

Expand Down Expand Up @@ -171,22 +172,20 @@ int PublishTableAdd(RedisModuleCtx *ctx, RedisModuleString *pubsub_channel_str,
return RedisModule_ReplyWithError(ctx, "error during PUBLISH");
}

std::string notification_key;
REPLY_AND_RETURN_IF_NOT_OK(
GetBroadcastKey(ctx, pubsub_channel_str, id, &notification_key));
// Publish the data to any clients who requested notifications on this key.
RedisModuleKey *notification_key;
REPLY_AND_RETURN_IF_NOT_OK(OpenBroadcastKey(&notification_key, ctx, pubsub_channel_str,
id, REDISMODULE_READ | REDISMODULE_WRITE));
if (RedisModule_KeyType(notification_key) != REDISMODULE_KEYTYPE_EMPTY) {
// NOTE(swang): Sets are not implemented yet, so we use ZSETs instead.
REPLY_AND_RETURN_IF_FALSE(RedisModule_ZsetFirstInScoreRange(
notification_key, REDISMODULE_NEGATIVE_INFINITE,
REDISMODULE_POSITIVE_INFINITE, 1, 1) == REDISMODULE_OK,
"Unable to initialize zset iterator");
for (; !RedisModule_ZsetRangeEndReached(notification_key);
RedisModule_ZsetRangeNext(notification_key)) {
RedisModuleString *client_channel =
RedisModule_ZsetRangeCurrentElement(notification_key, NULL);
auto it = notification_map.find(notification_key);
if (it != notification_map.end()) {
for (const std::string &client_channel : it->second) {
// RedisModule_Call seems to be broken and cannot accept "bb",
// therefore we construct a temporary redis string here, which
// will be garbage collected by redis.
auto channel =
RedisModule_CreateString(ctx, client_channel.data(), client_channel.size());
RedisModuleCallReply *reply = RedisModule_Call(
ctx, "PUBLISH", "sb", client_channel, fbb.GetBufferPointer(), fbb.GetSize());
ctx, "PUBLISH", "sb", channel, fbb.GetBufferPointer(), fbb.GetSize());
if (reply == NULL) {
return RedisModule_ReplyWithError(ctx, "error during PUBLISH");
}
Expand Down Expand Up @@ -509,12 +508,10 @@ int TableRequestNotifications_RedisCommand(RedisModuleCtx *ctx, RedisModuleStrin

// Add this client to the set of clients that should be notified when there
// are changes to the key.
RedisModuleKey *notification_key;
REPLY_AND_RETURN_IF_NOT_OK(OpenBroadcastKey(&notification_key, ctx, pubsub_channel_str,
id, REDISMODULE_READ | REDISMODULE_WRITE));
REPLY_AND_RETURN_IF_FALSE(
RedisModule_ZsetAdd(notification_key, 0.0, client_channel, NULL) == REDISMODULE_OK,
"ZsetAdd failed.");
std::string notification_key;
REPLY_AND_RETURN_IF_NOT_OK(
GetBroadcastKey(ctx, pubsub_channel_str, id, &notification_key));
notification_map[notification_key].push_back(RedisString_ToString(client_channel));

// Lookup the current value at the key.
RedisModuleKey *table_key;
Expand Down Expand Up @@ -564,17 +561,16 @@ int TableCancelNotifications_RedisCommand(RedisModuleCtx *ctx, RedisModuleString

// Remove this client from the set of clients that should be notified when
// there are changes to the key.
RedisModuleKey *notification_key;
REPLY_AND_RETURN_IF_NOT_OK(OpenBroadcastKey(&notification_key, ctx, pubsub_channel_str,
id, REDISMODULE_READ | REDISMODULE_WRITE));
if (RedisModule_KeyType(notification_key) != REDISMODULE_KEYTYPE_EMPTY) {
REPLY_AND_RETURN_IF_FALSE(
RedisModule_ZsetRem(notification_key, client_channel, NULL) == REDISMODULE_OK,
"not opened for writing or wrong type.");
size_t size = RedisModule_ValueLength(notification_key);
if (size == 0) {
REPLY_AND_RETURN_IF_FALSE(RedisModule_DeleteKey(notification_key) == REDISMODULE_OK,
"Unable to delete zset key.");
std::string notification_key;
REPLY_AND_RETURN_IF_NOT_OK(
GetBroadcastKey(ctx, pubsub_channel_str, id, &notification_key));
auto it = notification_map.find(notification_key);
if (it != notification_map.end()) {
it->second.erase(std::remove(it->second.begin(), it->second.end(),
RedisString_ToString(client_channel)),
it->second.end());
if (it->second.size() == 0) {
notification_map.erase(it);
}
}

Expand Down Expand Up @@ -646,6 +642,25 @@ int TableTestAndUpdate_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **arg
return result;
}

std::string DebugString() {
std::stringstream result;
result << "RedisModule:";
result << "\n- NotificationMap.size = " << notification_map.size();
result << std::endl;
return result.str();
}

int DebugString_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
RedisModule_AutoMemory(ctx);

if (argc != 1) {
return RedisModule_WrongArity(ctx);
}
std::string debug_string = DebugString();
return RedisModule_ReplyWithStringBuffer(ctx, debug_string.data(), debug_string.size());
}

extern "C" {

/* This function must be present on each Redis module. It is used in order to
Expand Down Expand Up @@ -691,6 +706,11 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
return REDISMODULE_ERR;
}

if (RedisModule_CreateCommand(ctx, "ray.debug_string", DebugString_RedisCommand,
"readonly", 0, 0, 0) == REDISMODULE_ERR) {
return REDISMODULE_ERR;
}

#if RAY_USE_NEW_GCS
// Chain-enabled commands that depend on ray-project/credis.
if (RedisModule_CreateCommand(ctx, "ray.chain.table_add", ChainTableAdd_RedisCommand,
Expand Down
6 changes: 6 additions & 0 deletions src/ray/gcs/redis_module/redis_string.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,10 @@ RedisModuleString *RedisString_Format(RedisModuleCtx *ctx, const char *fmt, ...)
return result;
}

std::string RedisString_ToString(RedisModuleString *string) {
size_t size;
const char *data = RedisModule_StringPtrLen(string, &size);
return std::string(data, size);
}

#endif // RAY_REDIS_STRING_H_
31 changes: 31 additions & 0 deletions test/jenkins_tests/multi_node_tests/test_wait_hanging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import ray


@ray.remote
def f():
return 0


@ray.remote
def g():
import time
start = time.time()
while time.time() < start + 1:
ray.get([f.remote() for _ in range(10)])


# 10MB -> hangs after ~5 iterations
# 20MB -> hangs after ~20 iterations
# 50MB -> hangs after ~50 iterations
ray.init(redis_max_memory=1024 * 1024 * 50)

i = 0
for i in range(100):
i += 1
a = g.remote()
[ok], _ = ray.wait([a])
print("iter", i)