Skip to content

Request and cancel notifications in the new GCS API #1758

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged

Conversation

stephanie-wang
Copy link
Contributor

What do these changes do?

This allows a caller to request and cancel notifications about a specific key in a GCS table.

@stephanie-wang stephanie-wang changed the title Request and cancel notifications in the new GCS API [WIP] Request and cancel notifications in the new GCS API Mar 20, 2018
@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4445/
Test FAILed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4447/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4448/
Test PASSed.

Copy link
Contributor

@concretevitamin concretevitamin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few concerns:

  • I suggest we stay away from boost. Asio, sure, but not other stuff.
  • tables.h has become larger and contains many definitions instead of pure declaration. Consider separating into a tables.cc?

RAY_CHECK(RedisModule_StringToLongLong(
pubsub_channel_str, &pubsub_channel_long) == REDISMODULE_OK)
<< "Prefix must be a valid TablePrefix";
char pubsub_channel[sizeof(TablePubsub) + 1];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why +1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the null terminator, although this doesn't measure the maximum number of characters needed properly...I changed this to TablePubsub_MAX.

@@ -537,6 +617,75 @@ int TableLookup_RedisCommand(RedisModuleCtx *ctx,
return REDISMODULE_OK;
}

int TableRequestNotifications_RedisCommand(RedisModuleCtx *ctx,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add some doc

}
}

int TableCancelNotifications_RedisCommand(RedisModuleCtx *ctx,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doc?

@@ -554,14 +703,15 @@ bool is_nil(const std::string &data) {
int TableTestAndUpdate_RedisCommand(RedisModuleCtx *ctx,
RedisModuleString **argv,
int argc) {
if (argc != 4) {
if (argc != 5) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add doc on args:

// This is a temporary redis command that will be removed once
// the GCS uses https://github.com/pcmoritz/credis.
// Be careful, this only supports Task Table payloads.
// Args:
//     prefix_str: ...
// ...

RedisModule_Call(ctx, "PUBLISH", "ss", client_channel, data);
if (reply == NULL) {
RedisModule_CloseKey(notification_key);
RedisModule_ReplyWithError(ctx, "error during PUBLISH");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be return RedisModule_ReplyWithError(ctx, "error during PUBLISH");? Otherwise the key will be closed twice (which I think is fine, just somewhat stands out as odd).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

}
}
}
RedisModule_CloseKey(notification_key);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This redis command needs to return an int (OK or ERROR).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does (further down).

@@ -517,13 +596,14 @@ int TableAdd_RedisCommand(RedisModuleCtx *ctx,
int TableLookup_RedisCommand(RedisModuleCtx *ctx,
RedisModuleString **argv,
int argc) {
if (argc != 3) {
if (argc != 4) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please document args

@@ -394,17 +447,18 @@ bool PublishObjectNotification(RedisModuleCtx *ctx,
int TableAdd_RedisCommand(RedisModuleCtx *ctx,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This command seems very bloated now. A rule of thumb I learned was any func > 50 lines should probably be broken down.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, although a lot of the bloat will hopefully go away soon, since a lot of the code is here is only necessary to support legacy code (like the task table).

@@ -24,24 +24,37 @@ void GlobalRedisCallback(void *c, void *r, void *privdata) {
}
int64_t callback_index = reinterpret_cast<int64_t>(privdata);
redisReply *reply = reinterpret_cast<redisReply *>(r);
// We use an optional response to distinguish between a nil response and a
// response of an empty string.
boost::optional<const std::string &> optional_data;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't use boost optional. My unpopular opinion is we shouldn't use boost at all.

boost::optional is not in the approved subset by Google style: http://google.github.io/styleguide/cppguide.html#Boost

We should transition to abseil. https://abseil.io/tips/123

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to not using boost anywhere except for asio.

@stephanie-wang stephanie-wang force-pushed the gcs-request-notifications branch from 21e2468 to f37160d Compare March 21, 2018 05:24
@stephanie-wang
Copy link
Contributor Author

  • I actually just updated this to return a std::vector instead, since it works better with the planned changes to support a set of entries in a key. Can you take a look again?
  • Sounds good.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4450/
Test FAILed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4451/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4453/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4455/
Test PASSed.

@concretevitamin
Copy link
Contributor

concretevitamin commented Mar 21, 2018 via email

@stephanie-wang
Copy link
Contributor Author

Yes, that would be great, thanks, @concretevitamin! I made some significant changes to the gcs/tables.h API, so it would be good to get your opinion on that as well.

@stephanie-wang stephanie-wang changed the title [WIP] Request and cancel notifications in the new GCS API Request and cancel notifications in the new GCS API Mar 21, 2018
@elibol
Copy link
Contributor

elibol commented Mar 21, 2018

LGTM. Just need to make sure we coordinate this change with the soon-to-be xray-task-forwarding PR, which uses the object table lookup failure callback.

@concretevitamin
Copy link
Contributor

concretevitamin commented Mar 21, 2018 via email

@@ -53,6 +53,31 @@ static const char *table_prefixes[] = {
NULL, "TASK:", "CLIENT:", "OBJECT:", "FUNCTION:",
};

/// Parse a Redis string into a TablePubsub channel.
TablePubsub ParseTablePubsub(RedisModuleString *pubsub_channel_str) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const (to indicate no changes/freeing, etc)


/// Format a pubsub channel for a specific key. pubsub_channel_str should
/// contain a valid TablePubsub.
RedisModuleString *FormatPubsubChannel(RedisModuleCtx *ctx,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const the args if you can

@@ -158,6 +131,9 @@ class Table {
TablePubsub pubsub_channel_;
/// The prefix to use for keys in this table.
TablePrefix prefix_;
/// Whether we have subscribed to the table yet. Each client may only
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Whether" is not informative, I actually had to search for this arg to understand.

Say something like, ">= 0 iff we have subscribed, otherwise -1."

flatbuffers::FlatBufferBuilder fbb;
fbb.ForceDefaults(true);
fbb.Finish(Data::Pack(fbb, data.get()));
RAY_RETURN_NOT_OK(context_->RunAsync("RAY.TABLE_ADD", id, fbb.GetBufferPointer(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be simplified to return context_->RunAsync(..);

return true;
});
std::vector<uint8_t> nil;
RAY_RETURN_NOT_OK(context_->RunAsync("RAY.TABLE_LOOKUP", id, nil.data(), nil.size(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be simplified

RAY_LOG(FATAL) << "Fatal redis error of type " << reply->type << " and with string "
<< reply->str;
}

if (callback_index >= 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Warrants an anon-namespace helper since it repeats, in my opinion

MaybeRemoveCallback(callback_index);

for

+  if (callback_index >= 0) {
+    bool delete_callback = RedisCallbackManager::instance().get(callback_index)(data);
+    if (delete_callback) {
+      RedisCallbackManager::instance().remove(callback_index);
+    }
+  }

data = std::string(reply->str, reply->len);
} else if (reply->type == REDIS_REPLY_STATUS) {
} else if (reply->type == REDIS_REPLY_ERROR) {
std::vector<std::string> data;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems data always has size 1 throughout this PR. If that's true, what's the reason of changing the iface to vector (by default I think it allocates a few more slots than the absolute minimum, which is 1; this might be wrong though)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eventually we want to transition to an interface where an operation appends a value instead of just overwriting it, which isn't flexible enough to support the object table, for instance. One thing that could work is if we use the std::vector interface internally, but the interface for the table caller can still take in just a single data. Would you prefer that we do that in this PR?

ASSERT_EQ(data->managers, std::vector<std::string>({"A", "B"}));
const std::vector<ObjectTableDataT> &data) {
ASSERT_EQ(data.size(), 1);
ASSERT_EQ(data[0].managers, std::vector<std::string>({"A", "B"}));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I imagine (and hope) ASSERT_EQ(data[0].managers, {"A", "B"}); just works.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not, I think because of the ASSERT_EQ macro:

error: macro "ASSERT_EQ" passed 3 arguments, but takes just 2
   ASSERT_EQ(data[0].managers, {"A", "B"});

// Check that the object entry was added.
ASSERT_EQ(data->managers, std::vector<std::string>({"A", "B"}));
ASSERT_EQ(data.size(), 1);
ASSERT_EQ(data[0].managers, std::vector<std::string>({"A", "B"}));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same

const std::vector<ObjectTableDataT> &data) {
ASSERT_EQ(data.size(), 1);
// Check that the object entry was added.
ASSERT_EQ(data[0].managers, std::vector<std::string>({"A", "B"}));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

@concretevitamin
Copy link
Contributor

concretevitamin commented Mar 21, 2018 via email

@stephanie-wang
Copy link
Contributor Author

It does not fix the compiler issue.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4456/
Test FAILed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4457/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4460/
Test PASSed.

Copy link
Contributor

@concretevitamin concretevitamin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Feel free to self-merge after addressing a few nits!

std::vector<uint8_t> nil;
return context_->RunAsync("RAY.TABLE_LOOKUP", id, nil.data(), nil.size(), prefix_,
pubsub_channel_, callback_index);
return Status::OK();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does double returning pass all our tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know...oh dear.

fbb.Finish(Data::Pack(fbb, data.get()));
return context_->RunAsync("RAY.TABLE_ADD", id, fbb.GetBufferPointer(), fbb.GetSize(),
prefix_, pubsub_channel_, callback_index);
return Status::OK();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

double returning ?!

}
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: leave a blank line to be symmetric

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, lint actually complains about this.

@@ -42,7 +42,12 @@ class TestGcs : public ::testing::Test {

virtual void Stop() = 0;

int64_t NumCallbacks() { return num_callbacks_; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: int64_t num_callbacks() const { return num_callbacks_; }

smaller case for cheap methods; const when you can

up to you

@@ -1429,6 +1642,18 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx,
return REDISMODULE_ERR;
}

if (RedisModule_CreateCommand(ctx, "ray.table_request_notifications",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side note: I probably won't be able to port all of these to credis myself...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I can help with that. I hope though, that this interface will actually be easier to implement in credis than trying to support everything in vanilla Ray's Redis modules.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4467/
Test FAILed.

@concretevitamin
Copy link
Contributor

New changes LGTM

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4468/
Test PASSed.

@stephanie-wang stephanie-wang merged commit 8704c86 into ray-project:master Mar 22, 2018
@stephanie-wang stephanie-wang deleted the gcs-request-notifications branch March 22, 2018 17:31
royf added a commit to royf/ray that referenced this pull request Apr 22, 2018
* commit 'f69cbd35d4e86f2a3c2ace875aaf8166edb69f5d': (64 commits)
  Bump version to 0.4.0. (ray-project#1745)
  Fix monitor.py bottleneck by removing excess Redis queries. (ray-project#1786)
  Convert the ObjectTable implementation to a Log (ray-project#1779)
  Acquire worker lock when importing actor. (ray-project#1783)
  Introduce a log interface for the new GCS (ray-project#1771)
  [tune] Fix linting error (ray-project#1777)
  [tune] Added pbt with keras on cifar10 dataset example (ray-project#1729)
  Add a GCS table for the xray task flatbuffer (ray-project#1775)
  [tune] Change tune resource request syntax to be less confusing (ray-project#1764)
  Remove from X import Y convention in RLlib ES. (ray-project#1774)
  Check if the provider is external before getting the config. (ray-project#1743)
  Request and cancel notifications in the new GCS API (ray-project#1758)
  Fix resource bookkeeping for blocked actor methods. (ray-project#1766)
  Fix bug when connecting another driver in local case. (ray-project#1760)
  Define string prefixes for all tables in the new GCS API (ray-project#1755)
  [rllib] Update RLlib to work with new actor scheduling behavior (ray-project#1754)
  Redirect output of all processes by default. (ray-project#1752)
  Add API for getting total cluster resources. (ray-project#1736)
  Always send actor creation tasks to the global scheduler. (ray-project#1757)
  Print error when actor takes too long to start, and refactor error me… (ray-project#1747)
  ...

# Conflicts:
#	python/ray/rllib/__init__.py
#	python/ray/rllib/dqn/dqn.py
#	python/ray/rllib/dqn/dqn_evaluator.py
#	python/ray/rllib/dqn/dqn_replay_evaluator.py
#	python/ray/rllib/optimizers/__init__.py
#	python/ray/rllib/tuned_examples/pong-dqn.yaml
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants