Skip to content
Merged
28 changes: 19 additions & 9 deletions python/ray/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,24 @@ def __init__(self, redis_address, redis_port, autoscaling_config):
self.issue_gcs_flushes = "RAY_USE_NEW_GCS" in os.environ
self.gcs_flush_policy = None
if self.issue_gcs_flushes:
# For now, we take the primary redis server to issue flushes,
# because task table entries are stored there under this flag.
try:
self.redis.execute_command("HEAD.FLUSH 0")
except redis.exceptions.ResponseError as e:
log.info("Turning off flushing due to exception: {}".format(
str(e)))
# Data is stored under the first data shard, so we issue flushes to
# that redis server.
addr_port = self.redis.lrange("RedisShards", 0, -1)
if len(addr_port) > 1:
log.warning("TODO: if launching > 1 redis shard, flushing "
"needs to touch shards in parallel.")
self.issue_gcs_flushes = False
else:
addr_port = addr_port[0].split(b":")
self.redis_shard = redis.StrictRedis(
host=addr_port[0], port=addr_port[1])
try:
self.redis_shard.execute_command("HEAD.FLUSH 0")
except redis.exceptions.ResponseError as e:
log.info(
"Turning off flushing due to exception: {}".format(
str(e)))
self.issue_gcs_flushes = False

def subscribe(self, channel):
"""Subscribe to the given channel.
Expand Down Expand Up @@ -562,11 +572,11 @@ def _maybe_flush_gcs(self):
return
self.gcs_flush_policy = pickle.loads(serialized)

if not self.gcs_flush_policy.should_flush(self.redis):
if not self.gcs_flush_policy.should_flush(self.redis_shard):
return

max_entries_to_flush = self.gcs_flush_policy.num_entries_to_flush()
num_flushed = self.redis.execute_command(
num_flushed = self.redis_shard.execute_command(
"HEAD.FLUSH {}".format(max_entries_to_flush))
log.info('num_flushed {}'.format(num_flushed))

Expand Down
5 changes: 5 additions & 0 deletions src/common/redis_module/chain_module.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ class RedisChainModule {

// Runs "node_func" on every node in the chain; after the tail node has run it
// too, finalizes the mutation by running "tail_func".
//
// If node_func() returns non-zero, it is treated as an error and the entire
// update will terminate early, without running subsequent node_func() and the
// final tail_func().
//
// TODO(zongheng): currently only supports 1-node chain.
int ChainReplicate(RedisModuleCtx *ctx,
RedisModuleString **argv,
Expand Down
115 changes: 76 additions & 39 deletions src/common/redis_module/ray_redis_module.cc
Original file line number Diff line number Diff line change
Expand Up @@ -654,37 +654,15 @@ int ChainTableAdd_RedisCommand(RedisModuleCtx *ctx,
}
#endif

/// Append an entry to the log stored at a key. Publishes a notification about
/// the update to all subscribers, if a pubsub channel is provided.
///
/// This is called from a client with the command:
//
/// RAY.TABLE_APPEND <table_prefix> <pubsub_channel> <id> <data>
/// <index (optional)>
///
/// \param table_prefix The prefix string for keys in this table.
/// \param pubsub_channel The pubsub channel name that notifications for
/// this key should be published to. When publishing to a specific
/// client, the channel name should be <pubsub_channel>:<client_id>.
/// \param id The ID of the key to append to.
/// \param data The data to append to the key.
/// \param index If this is set, then the data must be appended at this index.
/// If the current log is shorter or longer than the requested index,
/// then the append will fail and an error message will be returned as a
/// string.
/// \return OK if the append succeeds, or an error message string if the append
/// fails.
int TableAppend_RedisCommand(RedisModuleCtx *ctx,
RedisModuleString **argv,
int argc) {
RedisModule_AutoMemory(ctx);

int TableAppend_DoWrite(RedisModuleCtx *ctx,
RedisModuleString **argv,
int argc,
RedisModuleString **mutated_key_str) {
if (argc < 5 || argc > 6) {
return RedisModule_WrongArity(ctx);
}

RedisModuleString *prefix_str = argv[1];
RedisModuleString *pubsub_channel_str = argv[2];
RedisModuleString *id = argv[3];
RedisModuleString *data = argv[4];
RedisModuleString *index_str = nullptr;
Expand All @@ -693,8 +671,9 @@ int TableAppend_RedisCommand(RedisModuleCtx *ctx,
}

// Set the keys in the table.
RedisModuleKey *key = OpenPrefixedKey(ctx, prefix_str, id,
REDISMODULE_READ | REDISMODULE_WRITE);
RedisModuleKey *key =
OpenPrefixedKey(ctx, prefix_str, id, REDISMODULE_READ | REDISMODULE_WRITE,
mutated_key_str);
// Determine the index at which the data should be appended. If no index is
// requested, then is the current length of the log.
size_t index = RedisModule_ValueLength(key);
Expand All @@ -717,23 +696,76 @@ int TableAppend_RedisCommand(RedisModuleCtx *ctx,
// necessary since we implement the log with a sorted set, so all entries
// must be unique, or else we will have gaps in the log.
RAY_CHECK(flags == REDISMODULE_ZADD_ADDED) << "Appended a duplicate entry";
// Publish a message on the requested pubsub channel if necessary.
TablePubsub pubsub_channel = ParseTablePubsub(pubsub_channel_str);
if (pubsub_channel != TablePubsub::NO_PUBLISH) {
// All other pubsub channels write the data back directly onto the
// channel.
return PublishTableAdd(ctx, pubsub_channel_str, id, data);
} else {
return RedisModule_ReplyWithSimpleString(ctx, "OK");
}
return REDISMODULE_OK;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may need to be more careful about which functions (node vs. tail) should return a Redis reply and when, to make sure that every Redis command returns exactly one reply. I think it should be:

  1. If node func errors, then it should return an error reply.
  2. If node func does not error, then it should not return anything.
  3. Tail func should always return a Redis reply.

I think this means that for the master commit on credis, we are incorrectly handling the case where the node func does not error and the command is executed on a non-tail node. In this case, the node func does not return a reply and the tail func isn't run, so we won't be returning a Redis reply.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stephanie-wang yes, bullets 1-3 are exactly what I have in mind.

For singleton chain case (default deployment), both ActAsTail() and ActAsHead() will be true. So if node func does not error, the tail func will be run, and hence we will exactly get 1 reply.

For distributed, >1-chain case, the ChainReplicate() interface specifically designed for ray's redis module lacks this distributed support (i.e., propagating the update). (credis' native MemberPut_RedisCommand()/Put() do handle propagation, of course.) . Since it's not handled right now we should make sure 1-node case is correct at the moment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, just something to keep in mind once we move to distributed.

} else {
// The requested index did not match the current length of the log. Return
// an error message as a string.
const char *reply = "ERR entry exists";
return RedisModule_ReplyWithStringBuffer(ctx, reply, strlen(reply));
static const char *reply = "ERR entry exists";
RedisModule_ReplyWithStringBuffer(ctx, reply, strlen(reply));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use RedisModule_ReplyWithError here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It also returns OK (0),

 * The function always returns REDISMODULE_OK.
 */
int RM_ReplyWithError(RedisModuleCtx *ctx, const char *err) {
    return replyWithStatus(ctx,err,"-");
}

So I'd like to not do this & keep this PR as close to the original behavior as possible.

return REDISMODULE_ERR;
}
}

int TableAppend_DoPublish(RedisModuleCtx *ctx,
RedisModuleString **argv,
int /*argc*/) {
RedisModuleString *pubsub_channel_str = argv[2];
RedisModuleString *id = argv[3];
RedisModuleString *data = argv[4];
// Publish a message on the requested pubsub channel if necessary.
TablePubsub pubsub_channel = ParseTablePubsub(pubsub_channel_str);
if (pubsub_channel != TablePubsub::NO_PUBLISH) {
// All other pubsub channels write the data back directly onto the
// channel.
return PublishTableAdd(ctx, pubsub_channel_str, id, data);
} else {
return RedisModule_ReplyWithSimpleString(ctx, "OK");
}
}

/// Append an entry to the log stored at a key. Publishes a notification about
/// the update to all subscribers, if a pubsub channel is provided.
///
/// This is called from a client with the command:
//
/// RAY.TABLE_APPEND <table_prefix> <pubsub_channel> <id> <data>
/// <index (optional)>
///
/// \param table_prefix The prefix string for keys in this table.
/// \param pubsub_channel The pubsub channel name that notifications for
/// this key should be published to. When publishing to a specific
/// client, the channel name should be <pubsub_channel>:<client_id>.
/// \param id The ID of the key to append to.
/// \param data The data to append to the key.
/// \param index If this is set, then the data must be appended at this index.
/// If the current log is shorter or longer than the requested index,
/// then the append will fail and an error message will be returned as a
/// string.
/// \return OK if the append succeeds, or an error message string if the append
/// fails.
int TableAppend_RedisCommand(RedisModuleCtx *ctx,
RedisModuleString **argv,
int argc) {
RedisModule_AutoMemory(ctx);
const int status = TableAppend_DoWrite(ctx, argv, argc,
/*mutated_key_str=*/nullptr);
if (status) {
return status;
}
return TableAppend_DoPublish(ctx, argv, argc);
}

#if RAY_USE_NEW_GCS
int ChainTableAppend_RedisCommand(RedisModuleCtx *ctx,
RedisModuleString **argv,
int argc) {
RedisModule_AutoMemory(ctx);
return module.ChainReplicate(ctx, argv, argc,
/*node_func=*/TableAppend_DoWrite,
/*tail_func=*/TableAppend_DoPublish);
}
#endif

/// A helper function to create and finish a GcsTableEntry, based on the
/// current value or values at the given key.
void TableEntryToFlatbuf(RedisModuleKey *table_key,
Expand Down Expand Up @@ -1833,6 +1865,11 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx,
0, 0) == REDISMODULE_ERR) {
return REDISMODULE_ERR;
}
if (RedisModule_CreateCommand(ctx, "ray.chain.table_append",
ChainTableAppend_RedisCommand, "write pubsub",
0, 0, 0) == REDISMODULE_ERR) {
return REDISMODULE_ERR;
}
#endif

return REDISMODULE_OK;
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ AsyncGcsClient::AsyncGcsClient(const ClientID &client_id, CommandType command_ty
context_ = std::make_shared<RedisContext>();
primary_context_ = std::make_shared<RedisContext>();
client_table_.reset(new ClientTable(primary_context_, this, client_id));
object_table_.reset(new ObjectTable(context_, this));
object_table_.reset(new ObjectTable(context_, this, command_type));
actor_table_.reset(new ActorTable(context_, this));
task_table_.reset(new TaskTable(context_, this, command_type));
raylet_task_table_.reset(new raylet::TaskTable(context_, this, command_type));
Expand Down
49 changes: 36 additions & 13 deletions src/ray/gcs/tables.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,34 @@
#include "common_protocol.h"
#include "ray/gcs/client.h"

namespace {

static const std::string kTableAppendCommand = "RAY.TABLE_APPEND";
static const std::string kChainTableAppendCommand = "RAY.CHAIN.TABLE_APPEND";

static const std::string kTableAddCommand = "RAY.TABLE_ADD";
static const std::string kChainTableAddCommand = "RAY.CHAIN.TABLE_ADD";

std::string GetLogAppendCommand(const ray::gcs::CommandType command_type) {
if (command_type == ray::gcs::CommandType::kRegular) {
return kTableAppendCommand;
} else {
RAY_CHECK(command_type == ray::gcs::CommandType::kChain);
return kChainTableAppendCommand;
}
}

std::string GetTableAddCommand(const ray::gcs::CommandType command_type) {
if (command_type == ray::gcs::CommandType::kRegular) {
return kTableAddCommand;
} else {
RAY_CHECK(command_type == ray::gcs::CommandType::kChain);
return kChainTableAddCommand;
}
}

} // namespace

namespace ray {

namespace gcs {
Expand All @@ -19,8 +47,9 @@ Status Log<ID, Data>::Append(const JobID &job_id, const ID &id,
flatbuffers::FlatBufferBuilder fbb;
fbb.ForceDefaults(true);
fbb.Finish(Data::Pack(fbb, dataT.get()));
return context_->RunAsync("RAY.TABLE_APPEND", id, fbb.GetBufferPointer(), fbb.GetSize(),
prefix_, pubsub_channel_, std::move(callback));
return context_->RunAsync(GetLogAppendCommand(command_type_), id,
fbb.GetBufferPointer(), fbb.GetSize(), prefix_,
pubsub_channel_, std::move(callback));
}

template <typename ID, typename Data>
Expand All @@ -42,8 +71,9 @@ Status Log<ID, Data>::AppendAt(const JobID &job_id, const ID &id,
flatbuffers::FlatBufferBuilder fbb;
fbb.ForceDefaults(true);
fbb.Finish(Data::Pack(fbb, dataT.get()));
return context_->RunAsync("RAY.TABLE_APPEND", id, fbb.GetBufferPointer(), fbb.GetSize(),
prefix_, pubsub_channel_, std::move(callback), log_length);
return context_->RunAsync(GetLogAppendCommand(command_type_), id,
fbb.GetBufferPointer(), fbb.GetSize(), prefix_,
pubsub_channel_, std::move(callback), log_length);
}

template <typename ID, typename Data>
Expand Down Expand Up @@ -140,15 +170,8 @@ Status Table<ID, Data>::Add(const JobID &job_id, const ID &id,
flatbuffers::FlatBufferBuilder fbb;
fbb.ForceDefaults(true);
fbb.Finish(Data::Pack(fbb, dataT.get()));
if (command_type_ == CommandType::kRegular) {
return context_->RunAsync("RAY.TABLE_ADD", id, fbb.GetBufferPointer(), fbb.GetSize(),
prefix_, pubsub_channel_, std::move(callback));
} else {
RAY_CHECK(command_type_ == CommandType::kChain);
return context_->RunAsync("RAY.CHAIN.TABLE_ADD", id, fbb.GetBufferPointer(),
fbb.GetSize(), prefix_, pubsub_channel_,
std::move(callback));
}
return context_->RunAsync(GetTableAddCommand(command_type_), id, fbb.GetBufferPointer(),
fbb.GetSize(), prefix_, pubsub_channel_, std::move(callback));
}

template <typename ID, typename Data>
Expand Down
7 changes: 7 additions & 0 deletions src/ray/gcs/tables.h
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,13 @@ class ObjectTable : public Log<ObjectID, ObjectTableData> {
pubsub_channel_ = TablePubsub::OBJECT;
prefix_ = TablePrefix::OBJECT;
};

ObjectTable(const std::shared_ptr<RedisContext> &context, AsyncGcsClient *client,
gcs::CommandType command_type)
: ObjectTable(context, client) {
command_type_ = command_type;
};

virtual ~ObjectTable(){};
};

Expand Down