-
Notifications
You must be signed in to change notification settings - Fork 6.2k
Request and cancel notifications in the new GCS API #1758
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Request and cancel notifications in the new GCS API #1758
Conversation
Test FAILed. |
Test PASSed. |
Test PASSed. |
…s instead of through publish
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few concerns:
- I suggest we stay away from boost. Asio, sure, but not other stuff.
tables.h
has become larger and contains many definitions instead of pure declaration. Consider separating into atables.cc
?
RAY_CHECK(RedisModule_StringToLongLong( | ||
pubsub_channel_str, &pubsub_channel_long) == REDISMODULE_OK) | ||
<< "Prefix must be a valid TablePrefix"; | ||
char pubsub_channel[sizeof(TablePubsub) + 1]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why +1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the null terminator, although this doesn't measure the maximum number of characters needed properly...I changed this to TablePubsub_MAX
.
@@ -537,6 +617,75 @@ int TableLookup_RedisCommand(RedisModuleCtx *ctx, | |||
return REDISMODULE_OK; | |||
} | |||
|
|||
int TableRequestNotifications_RedisCommand(RedisModuleCtx *ctx, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add some doc
} | ||
} | ||
|
||
int TableCancelNotifications_RedisCommand(RedisModuleCtx *ctx, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doc?
@@ -554,14 +703,15 @@ bool is_nil(const std::string &data) { | |||
int TableTestAndUpdate_RedisCommand(RedisModuleCtx *ctx, | |||
RedisModuleString **argv, | |||
int argc) { | |||
if (argc != 4) { | |||
if (argc != 5) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add doc on args:
// This is a temporary redis command that will be removed once
// the GCS uses https://github.com/pcmoritz/credis.
// Be careful, this only supports Task Table payloads.
// Args:
// prefix_str: ...
// ...
RedisModule_Call(ctx, "PUBLISH", "ss", client_channel, data); | ||
if (reply == NULL) { | ||
RedisModule_CloseKey(notification_key); | ||
RedisModule_ReplyWithError(ctx, "error during PUBLISH"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be return RedisModule_ReplyWithError(ctx, "error during PUBLISH");
? Otherwise the key will be closed twice (which I think is fine, just somewhat stands out as odd).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
} | ||
} | ||
} | ||
RedisModule_CloseKey(notification_key); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This redis command needs to return an int (OK or ERROR).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does (further down).
@@ -517,13 +596,14 @@ int TableAdd_RedisCommand(RedisModuleCtx *ctx, | |||
int TableLookup_RedisCommand(RedisModuleCtx *ctx, | |||
RedisModuleString **argv, | |||
int argc) { | |||
if (argc != 3) { | |||
if (argc != 4) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please document args
@@ -394,17 +447,18 @@ bool PublishObjectNotification(RedisModuleCtx *ctx, | |||
int TableAdd_RedisCommand(RedisModuleCtx *ctx, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This command seems very bloated now. A rule of thumb I learned was any func > 50 lines should probably be broken down.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, although a lot of the bloat will hopefully go away soon, since a lot of the code is here is only necessary to support legacy code (like the task table).
src/ray/gcs/redis_context.cc
Outdated
@@ -24,24 +24,37 @@ void GlobalRedisCallback(void *c, void *r, void *privdata) { | |||
} | |||
int64_t callback_index = reinterpret_cast<int64_t>(privdata); | |||
redisReply *reply = reinterpret_cast<redisReply *>(r); | |||
// We use an optional response to distinguish between a nil response and a | |||
// response of an empty string. | |||
boost::optional<const std::string &> optional_data; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please don't use boost optional. My unpopular opinion is we shouldn't use boost at all.
boost::optional is not in the approved subset by Google style: http://google.github.io/styleguide/cppguide.html#Boost
We should transition to abseil. https://abseil.io/tips/123
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to not using boost anywhere except for asio.
21e2468
to
f37160d
Compare
|
Test FAILed. |
Test PASSed. |
Test PASSed. |
Test PASSed. |
Is this ready for another look?
…On Wed, Mar 21, 2018 at 12:07 UCB AMPLab ***@***.***> wrote:
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4455/
Test PASSed.
—
You are receiving this because your review was requested.
Reply to this email directly, view it on GitHub
<#1758 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AAkLHijL3_-I2rgCk_eh-MD1yOym62TGks5tgqT8gaJpZM4SywRm>
.
|
Yes, that would be great, thanks, @concretevitamin! I made some significant changes to the |
LGTM. Just need to make sure we coordinate this change with the soon-to-be |
Taking a look!
…On Wed, Mar 21, 2018 at 12:42 PM Melih Elibol ***@***.***> wrote:
LGTM. Just need to make sure we coordinate this change with the soon-to-be
xray-task-forwarding PR, which uses the object table lookup failure
callback.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#1758 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AAkLHuKyasC8bbNYj_MzdEN8xnBLS7f7ks5tgq0ygaJpZM4SywRm>
.
|
@@ -53,6 +53,31 @@ static const char *table_prefixes[] = { | |||
NULL, "TASK:", "CLIENT:", "OBJECT:", "FUNCTION:", | |||
}; | |||
|
|||
/// Parse a Redis string into a TablePubsub channel. | |||
TablePubsub ParseTablePubsub(RedisModuleString *pubsub_channel_str) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const (to indicate no changes/freeing, etc)
|
||
/// Format a pubsub channel for a specific key. pubsub_channel_str should | ||
/// contain a valid TablePubsub. | ||
RedisModuleString *FormatPubsubChannel(RedisModuleCtx *ctx, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const the args if you can
src/ray/gcs/tables.h
Outdated
@@ -158,6 +131,9 @@ class Table { | |||
TablePubsub pubsub_channel_; | |||
/// The prefix to use for keys in this table. | |||
TablePrefix prefix_; | |||
/// Whether we have subscribed to the table yet. Each client may only |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Whether" is not informative, I actually had to search for this arg to understand.
Say something like, ">= 0 iff we have subscribed, otherwise -1."
src/ray/gcs/tables.cc
Outdated
flatbuffers::FlatBufferBuilder fbb; | ||
fbb.ForceDefaults(true); | ||
fbb.Finish(Data::Pack(fbb, data.get())); | ||
RAY_RETURN_NOT_OK(context_->RunAsync("RAY.TABLE_ADD", id, fbb.GetBufferPointer(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be simplified to return context_->RunAsync(..);
src/ray/gcs/tables.cc
Outdated
return true; | ||
}); | ||
std::vector<uint8_t> nil; | ||
RAY_RETURN_NOT_OK(context_->RunAsync("RAY.TABLE_LOOKUP", id, nil.data(), nil.size(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can be simplified
src/ray/gcs/redis_context.cc
Outdated
RAY_LOG(FATAL) << "Fatal redis error of type " << reply->type << " and with string " | ||
<< reply->str; | ||
} | ||
|
||
if (callback_index >= 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Warrants an anon-namespace helper since it repeats, in my opinion
MaybeRemoveCallback(callback_index);
for
+ if (callback_index >= 0) {
+ bool delete_callback = RedisCallbackManager::instance().get(callback_index)(data);
+ if (delete_callback) {
+ RedisCallbackManager::instance().remove(callback_index);
+ }
+ }
data = std::string(reply->str, reply->len); | ||
} else if (reply->type == REDIS_REPLY_STATUS) { | ||
} else if (reply->type == REDIS_REPLY_ERROR) { | ||
std::vector<std::string> data; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems data
always has size 1 throughout this PR. If that's true, what's the reason of changing the iface to vector (by default I think it allocates a few more slots than the absolute minimum, which is 1; this might be wrong though)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Eventually we want to transition to an interface where an operation appends a value instead of just overwriting it, which isn't flexible enough to support the object table, for instance. One thing that could work is if we use the std::vector
interface internally, but the interface for the table caller can still take in just a single data
. Would you prefer that we do that in this PR?
src/ray/gcs/client_test.cc
Outdated
ASSERT_EQ(data->managers, std::vector<std::string>({"A", "B"})); | ||
const std::vector<ObjectTableDataT> &data) { | ||
ASSERT_EQ(data.size(), 1); | ||
ASSERT_EQ(data[0].managers, std::vector<std::string>({"A", "B"})); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I imagine (and hope) ASSERT_EQ(data[0].managers, {"A", "B"});
just works.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does not, I think because of the ASSERT_EQ
macro:
error: macro "ASSERT_EQ" passed 3 arguments, but takes just 2
ASSERT_EQ(data[0].managers, {"A", "B"});
src/ray/gcs/client_test.cc
Outdated
// Check that the object entry was added. | ||
ASSERT_EQ(data->managers, std::vector<std::string>({"A", "B"})); | ||
ASSERT_EQ(data.size(), 1); | ||
ASSERT_EQ(data[0].managers, std::vector<std::string>({"A", "B"})); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same
src/ray/gcs/client_test.cc
Outdated
const std::vector<ObjectTableDataT> &data) { | ||
ASSERT_EQ(data.size(), 1); | ||
// Check that the object entry was added. | ||
ASSERT_EQ(data[0].managers, std::vector<std::string>({"A", "B"})); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same
For ASSERT_EQ I think a convention is to put the expected literals as first
arg. Would that fix the compiler issue?
…On Wed, Mar 21, 2018 at 14:22 Stephanie Wang ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In src/ray/gcs/redis_context.cc
<#1758 (comment)>:
> @@ -24,24 +24,31 @@ void GlobalRedisCallback(void *c, void *r, void *privdata) {
}
int64_t callback_index = reinterpret_cast<int64_t>(privdata);
redisReply *reply = reinterpret_cast<redisReply *>(r);
- std::string data = "";
- if (reply->type == REDIS_REPLY_NIL) {
- // Respond with blank string, which triggers a failure callback for lookups.
- } else if (reply->type == REDIS_REPLY_STRING) {
- data = std::string(reply->str, reply->len);
- } else if (reply->type == REDIS_REPLY_ARRAY) {
- reply = reply->element[reply->elements - 1];
- data = std::string(reply->str, reply->len);
- } else if (reply->type == REDIS_REPLY_STATUS) {
- } else if (reply->type == REDIS_REPLY_ERROR) {
+ std::vector<std::string> data;
Eventually we want to transition to an interface where an operation
appends a value instead of just overwriting it, which isn't flexible enough
to support the object table, for instance. One thing that could work is if
we use the std::vector interface internally, but the interface for the
table caller can still take in just a single data. Would you prefer that
we do that in this PR?
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#1758 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AAkLHjZhnzf_NZP7VZZwBToS6xRzEmstks5tgsR2gaJpZM4SywRm>
.
|
It does not fix the compiler issue. |
This reverts commit 6e3e690.
…kes a reference to a single item
Test FAILed. |
Test PASSed. |
Test PASSed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Feel free to self-merge after addressing a few nits!
src/ray/gcs/tables.cc
Outdated
std::vector<uint8_t> nil; | ||
return context_->RunAsync("RAY.TABLE_LOOKUP", id, nil.data(), nil.size(), prefix_, | ||
pubsub_channel_, callback_index); | ||
return Status::OK(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does double returning pass all our tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know...oh dear.
src/ray/gcs/tables.cc
Outdated
fbb.Finish(Data::Pack(fbb, data.get())); | ||
return context_->RunAsync("RAY.TABLE_ADD", id, fbb.GetBufferPointer(), fbb.GetSize(), | ||
prefix_, pubsub_channel_, callback_index); | ||
return Status::OK(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
double returning ?!
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: leave a blank line to be symmetric
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, lint actually complains about this.
src/ray/gcs/client_test.cc
Outdated
@@ -42,7 +42,12 @@ class TestGcs : public ::testing::Test { | |||
|
|||
virtual void Stop() = 0; | |||
|
|||
int64_t NumCallbacks() { return num_callbacks_; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: int64_t num_callbacks() const { return num_callbacks_; }
smaller case for cheap methods; const when you can
up to you
@@ -1429,6 +1642,18 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, | |||
return REDISMODULE_ERR; | |||
} | |||
|
|||
if (RedisModule_CreateCommand(ctx, "ray.table_request_notifications", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Side note: I probably won't be able to port all of these to credis myself...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I can help with that. I hope though, that this interface will actually be easier to implement in credis than trying to support everything in vanilla Ray's Redis modules.
Test FAILed. |
New changes LGTM |
Test PASSed. |
* commit 'f69cbd35d4e86f2a3c2ace875aaf8166edb69f5d': (64 commits) Bump version to 0.4.0. (ray-project#1745) Fix monitor.py bottleneck by removing excess Redis queries. (ray-project#1786) Convert the ObjectTable implementation to a Log (ray-project#1779) Acquire worker lock when importing actor. (ray-project#1783) Introduce a log interface for the new GCS (ray-project#1771) [tune] Fix linting error (ray-project#1777) [tune] Added pbt with keras on cifar10 dataset example (ray-project#1729) Add a GCS table for the xray task flatbuffer (ray-project#1775) [tune] Change tune resource request syntax to be less confusing (ray-project#1764) Remove from X import Y convention in RLlib ES. (ray-project#1774) Check if the provider is external before getting the config. (ray-project#1743) Request and cancel notifications in the new GCS API (ray-project#1758) Fix resource bookkeeping for blocked actor methods. (ray-project#1766) Fix bug when connecting another driver in local case. (ray-project#1760) Define string prefixes for all tables in the new GCS API (ray-project#1755) [rllib] Update RLlib to work with new actor scheduling behavior (ray-project#1754) Redirect output of all processes by default. (ray-project#1752) Add API for getting total cluster resources. (ray-project#1736) Always send actor creation tasks to the global scheduler. (ray-project#1757) Print error when actor takes too long to start, and refactor error me… (ray-project#1747) ... # Conflicts: # python/ray/rllib/__init__.py # python/ray/rllib/dqn/dqn.py # python/ray/rllib/dqn/dqn_evaluator.py # python/ray/rllib/dqn/dqn_replay_evaluator.py # python/ray/rllib/optimizers/__init__.py # python/ray/rllib/tuned_examples/pong-dqn.yaml
What do these changes do?
This allows a caller to request and cancel notifications about a specific key in a GCS table.