-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Integrate credis with Ray & route task table entries into credis. #1841
Integrate credis with Ray & route task table entries into credis. #1841
Conversation
Test PASSed. |
Can you fix the conflicts so the Travis CI will run? |
Looks good to me, but let's not make the Python linting changes as part of this PR. If we want them we should commit the style file and have a PR with only linting changes. Also we need to make sure all the Travis tests pass before merging. |
d82f8b0
to
57d181e
Compare
Rebased and pushed, tests running now. For the reformatting, I'm happy to look into forcing For the current PR, it'd be a lot of work to isolate my changes out. Plus, I believe formatting that file is strictly better than the status-quo (unformatted). |
BTW, the style file has been commented for a long time, in Simply type
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks pretty good! Main request is to not expose the CommandType
in gcs/tables.h
(see detailed comment below).
python/ray/services.py
Outdated
def start_redis(node_ip_address, | ||
port=None, | ||
redis_shard_ports=None, | ||
num_redis_shards=1, | ||
redis_max_clients=None, | ||
redirect_output=False, | ||
redirect_worker_output=False, | ||
cleanup=True): | ||
cleanup=True, | ||
no_credis=True): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe rename to use_credis=False
to match the naming scheme for use_raylet
.
python/ray/services.py
Outdated
stderr_file=redis_stderr_file, | ||
cleanup=cleanup) | ||
else: | ||
# It is import to load the credis module BEFORE the ray module, as the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the relevant line for this at 467? If yes, move the comment down to that line (also "import" --> "important").
python/ray/services.py
Outdated
stderr_file=redis_stderr_file, | ||
cleanup=cleanup) | ||
else: | ||
# It is import to load the credis module BEFORE the ray module, as the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here for moving the comment to the relevant line.
// | ||
// extern RedisChainModule module; | ||
// int MyCmd_RedisModuleCmd(...) { | ||
// return module.Mutate(..., NodeFunc, TailFunc); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
// Runs "node_func" on every node in the chain; after the tail node has run it | ||
// too, finalizes the mutation by running "tail_func". | ||
// TODO(zongheng): currently only supports 1-node chain. | ||
int Mutate(RedisModuleCtx *ctx, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would make this name more descriptive of the use case. Maybe something like ChainReplicate
.
// A function that runs on every node in the chain. Type: | ||
// (context, argv, argc, (can be nullptr) mutated_key_str) -> int | ||
// | ||
// If the fourth arg is passed, NodeFunc must fill in the key being mutated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little confused by this comment. Can you be more specific about this optional argument, e.g., when it should be passed? Also, what does NodeFunc
fill in the key with?
#include "redismodule.h" | ||
|
||
// NOTE(zongheng): this duplicated declaration serves as forward-declaration | ||
// only. The implementation is supposed to be linked in from credis. In |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you specify where exactly the implementation is in the comment? I'm having trouble finding it in the credis repo.
// } | ||
// | ||
// See, for instance, ChainTableAdd_RedisCommand in ray_redis_module.cc. | ||
class RedisChainModule { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not necessary for this PR, but a suggestion to make it easier to port over other commands in the future is to define another module that implements the common interface, but is for non-chain-replicated commands, like RedisModule
. Then, I believe we could do something like this in ray_redis_module.cc
:
#if RAY_USE_NEW_GCS
module = RedisChainModule;
#else
module = RedisModule;
#endif
if (RedisModule_CreateCommand(ctx, "ray.table_add",
module.Mutate(TableAdd_DoWrite, TableAdd_DoPublish), "write pubsub", 0,
0, 0) == REDISMODULE_ERR) {
return REDISMODULE_ERR;
}
src/ray/gcs/tables.h
Outdated
@@ -223,6 +226,10 @@ class Table : private Log<ID, Data>, public TableInterface<ID, Data> { | |||
Status Add(const JobID &job_id, const ID &id, std::shared_ptr<DataT> data, | |||
const WriteCallback &done); | |||
|
|||
/// A general version of Add() that supports different CommandType's. | |||
Status Add(const JobID &job_id, const ID &id, std::shared_ptr<DataT> data, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it makes sense to expose the command type in the Table
interface. Then everyone who calls operations on a table will have to think about whether the table is chain-replicated or not. And if both versions of a table are supported, like for the task table, then there will be no error if we try to Add
to the chain-replicated version in one place and Add
to the non-chain-replicated version in another. Can you hide the CommandType
by moving it into the implementation (e.g., set it to kRegular
in the default Table
constructor and set it to kChain
in the task table derived classes)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went ahead and investigated per our offline discussion. Here's another thought. If we make kRegular/kChain
a ctor arg for AsyncGcsClient
and various Table
s, it gives the reader a hint that all commands with such a client/table pair will be chain-enabled. Which is not the case; therefore I think incrementally adding such support to what's actually been enabled, e.g. Table::Add(..., CommandType _)
, makes more sense.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still don't think it makes sense to expose it in the Add
interface. From the caller's perspective, I think it's clearest if we make the granularity for chain replication at the level of a table, not a command. For instance, I don't really understand what it means for Add
to be supported but not Lookup
. If your concern is about misleading the caller, I would just add a check on any unsupported methods that the CommandType
flag is set to kRegular
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"For instance, I don't really understand what it means for Add to be supported but not Lookup" <- this is a great argument & feedback. I agree. Let me see what I can do.
Linting is being run here: https://github.com/ray-project/ray/blob/master/.travis.yml#L33 |
Test PASSed. |
python/ray/services.py
Outdated
|
||
# Flatten to ["--loadmodule", "path/m1", ...]. | ||
loadmodule = list( | ||
itertools.chain.from_iterable([["--loadmodule", m] for m in modules])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
itertools seems too heavy-handed for just this line, and it seems a bit hard to understand. How about
load_module_args = []
for module in modules:
load_module_args += ["--loadmodule", m]
python/ray/services.py
Outdated
counter = 0 | ||
if port is not None: | ||
# If a port is specified, then try only once to connect. | ||
num_retries = 1 | ||
else: | ||
port = new_port() | ||
|
||
# Flatten to ["--loadmodule", "path/m1", ...]. | ||
loadmodule = list( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
loadmodule
-> load_module_args
(or at least load_module
)
Test PASSed. |
Yes, as @pcmoritz said, this is difficult to review due to all of the formatting changes. I see that you opened a different PR for that, please remove the formatting changes from this PR. |
537a912
to
66549e2
Compare
Addressed all comments, PTAL, thanks. |
Test FAILed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! We should probably also add a Python test in this PR for the RAY_USE_NEW_GCS
flag, so we can test the use_credis
codepath.
src/ray/gcs/tables.h
Outdated
@@ -183,7 +190,7 @@ class TableInterface { | |||
/// Example tables backed by Log: | |||
/// TaskTable: Stores Task metadata needed for executing the task. | |||
template <typename ID, typename Data> | |||
class Table : private Log<ID, Data>, public TableInterface<ID, Data> { | |||
class Table : public Log<ID, Data>, public TableInterface<ID, Data> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why change this to public
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not much reason besides the styleguide prefers to always use public inheritance :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, as I said in the original PR, I don't think it makes sense to follow the style guide in this case. public
actually seems worse since it exposes methods that shouldn't be called on a Table
at all. If you have a reason other than style, I'm open to this change, but that alone shouldn't be the reason for making it public
.
src/ray/gcs/tables.cc
Outdated
@@ -160,8 +159,14 @@ Status Table<ID, Data>::Add(const JobID &job_id, const ID &id, | |||
flatbuffers::FlatBufferBuilder fbb; | |||
fbb.ForceDefaults(true); | |||
fbb.Finish(Data::Pack(fbb, data.get())); | |||
return context_->RunAsync("RAY.TABLE_ADD", id, fbb.GetBufferPointer(), fbb.GetSize(), | |||
prefix_, pubsub_channel_, callback_index); | |||
if (command_type_ == CommandType::kRegular) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense to add checks for command_type_ == CommandType::kRegular
for unsupported commands?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add(), Lookup(), Subscribe() are all supported under the current configuration (1-node chain), so I opted to not add these CHECKs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stephanie-wang We couldn't really write a Python test for this, because we need the env var RAY_USE_NEW_GCS
set during build and run time.
src/ray/gcs/tables.h
Outdated
@@ -183,7 +190,7 @@ class TableInterface { | |||
/// Example tables backed by Log: | |||
/// TaskTable: Stores Task metadata needed for executing the task. | |||
template <typename ID, typename Data> | |||
class Table : private Log<ID, Data>, public TableInterface<ID, Data> { | |||
class Table : public Log<ID, Data>, public TableInterface<ID, Data> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not much reason besides the styleguide prefers to always use public inheritance :)
@concretevitamin, couldn't we run a Python test with a flag passed in from the command line? I'm a little wary of merging a codepath that isn't tested at all in the CI. |
@stephanie-wang when we're in a Python test, we have no idea whether the backend is built / compiled with the flag on. If it is not, and at runtime we try to launch credis, things will break. |
@concretevitamin , but don't we already do something similar for the tests that are skipped if Anyway, I don't really know what the best way to test is, but the point is that ideally we should find some way to test both the build and the codepath in CI. |
I think we should be able to test it in the |
66549e2
to
96ad840
Compare
Test FAILed. |
Addressed comments & pushed, PTAL |
Test PASSed. |
68498e1
to
ecb3cab
Compare
Test FAILed. |
Test FAILed. |
f2a17c6
to
0a79cca
Compare
@robertnishihara addressed, PTAL |
Test PASSed. |
Relevant test failure in
Probably the same issue
|
python/ray/services.py
Outdated
from __future__ import absolute_import | ||
from __future__ import division | ||
from __future__ import print_function | ||
from __future__ import absolute_import, division, print_function |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you revert this change. We do it the other way in all of the other files.
Test PASSed. |
python/ray/services.py
Outdated
@@ -618,7 +606,8 @@ def start_redis_instance(node_ip_address="127.0.0.1", | |||
port = new_port() | |||
counter += 1 | |||
if counter == num_retries: | |||
raise Exception("Couldn't start Redis.") | |||
raise Exception("Couldn't start Redis. Check stderr file " + | |||
stderr_file.name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stderr_file
can be None
, maybe just do
raise Exception("Couldn't start Redis. Check stderr file {}."
.format(stderr_file))
counter = 0 | ||
if port is not None: | ||
# If a port is specified, then try only once to connect. | ||
num_retries = 1 | ||
else: | ||
port = new_port() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you look at the above code block, retries should be happening only when the port is not specified, and if the port is specified, then a different random port should be tried each time. This is inconsistent with the error message I'm seeing
testInvalidTaskTableAdd (__main__.TestGlobalStateStore) ... 14397:M 24 May 03:52:08.898 # WARNING: The TCP backlog setting of 511 cannot be enforced because /proc/sys/net/core/somaxconn is set to the lower value of 128.
14397:M 24 May 03:52:08.898 # Server started, Redis version 3.9.102
14397:M 24 May 03:52:08.898 # WARNING overcommit_memory is set to 0! Background save may fail under low memory condition. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.
14397:M 24 May 03:52:08.898 # WARNING you have Transparent Huge Pages (THP) support enabled in your kernel. This will create latency and memory usage issues with Redis. To fix this issue run the command 'echo never > /sys/kernel/mm/transparent_hugepage/enabled' as root, and add it to your /etc/rc.local in order to retain the setting after a reboot. Redis must be restarted after THP is disabled.
Waiting for redis server at 127.0.0.1:51098 to respond...
14401:M 24 May 03:52:09.004 # Creating Server TCP listening socket *:59530: bind: Address already in use
Redis failed to start, retrying now.
14402:M 24 May 03:52:09.108 # Creating Server TCP listening socket *:59530: bind: Address already in use
Redis failed to start, retrying now.
14403:M 24 May 03:52:09.212 # Creating Server TCP listening socket *:59530: bind: Address already in use
Redis failed to start, retrying now.
14404:M 24 May 03:52:09.316 # Creating Server TCP listening socket *:59530: bind: Address already in use
Redis failed to start, retrying now.
14405:M 24 May 03:52:09.420 # Creating Server TCP listening socket *:59530: bind: Address already in use
Redis failed to start, retrying now.
14406:M 24 May 03:52:09.524 # Creating Server TCP listening socket *:59530: bind: Address already in use
Redis failed to start, retrying now.
14407:M 24 May 03:52:09.628 # Creating Server TCP listening socket *:59530: bind: Address already in use
Redis failed to start, retrying now.
14408:M 24 May 03:52:09.732 # Creating Server TCP listening socket *:59530: bind: Address already in use
Redis failed to start, retrying now.
14409:M 24 May 03:52:09.836 # Creating Server TCP listening socket *:59530: bind: Address already in use
Redis failed to start, retrying now.
14410:M 24 May 03:52:09.940 # Creating Server TCP listening socket *:59530: bind: Address already in use
Redis failed to start, retrying now.
14411:M 24 May 03:52:10.044 # Creating Server TCP listening socket *:59530: bind: Address already in use
Redis failed to start, retrying now.
14412:M 24 May 03:52:10.147 # Creating Server TCP listening socket *:59530: bind: Address already in use
Redis failed to start, retrying now.
14413:M 24 May 03:52:10.251 # Creating Server TCP listening socket *:59530: bind: Address already in use
Redis failed to start, retrying now.
14414:M 24 May 03:52:10.355 # Creating Server TCP listening socket *:59530: bind: Address already in use
Redis failed to start, retrying now.
14415:M 24 May 03:52:10.459 # Creating Server TCP listening socket *:59530: bind: Address already in use
Redis failed to start, retrying now.
14416:M 24 May 03:52:10.563 # Creating Server TCP listening socket *:59530: bind: Address already in use
Redis failed to start, retrying now.
14417:M 24 May 03:52:10.667 # Creating Server TCP listening socket *:59530: bind: Address already in use
Redis failed to start, retrying now.
14418:M 24 May 03:52:10.771 # Creating Server TCP listening socket *:59530: bind: Address already in use
Redis failed to start, retrying now.
14419:M 24 May 03:52:10.875 # Creating Server TCP listening socket *:59530: bind: Address already in use
Redis failed to start, retrying now.
14420:M 24 May 03:52:10.979 # Creating Server TCP listening socket *:59530: bind: Address already in use
ERROR
Any idea about this?
python/ray/services.py
Outdated
raise Exception("Couldn't start Redis. Check stderr file " + | ||
stderr_file.name) | ||
raise Exception("Couldn't start Redis. Check stdout file " + | ||
stdout_file.name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is still incorrect because stdout_file
can be None
Test PASSed. |
Test PASSed. |
Linting is still failing. |
Tests passed now. |
Test PASSed. |
Green now. |
* master: [autoscaler] GCP node provider (ray-project#2061) [xray] Evict tasks from the lineage cache (ray-project#2152) [ASV] Add ray.init and simple Ray benchmarks (ray-project#2166) Re-encrypt key for uploading to S3 from travis to use travis-ci.com. (ray-project#2169) [rllib] Fix A3C PyTorch implementation (ray-project#2036) [JavaWorker] Do not kill local-scheduler-forked workers in RunManager.cleanup (ray-project#2151) Update Travis CI badge from travis-ci.org to travis-ci.com. (ray-project#2155) Implement Python global state API for xray. (ray-project#2125) [xray] Improve flush algorithm for the lineage cache (ray-project#2130) Fix support for actor classmethods (ray-project#2146) Add empty df test (ray-project#1879) [JavaWorker] Enable java worker support (ray-project#2094) [DataFrame] Fixing the code formatting of the tests (ray-project#2123) Update resource documentation (remove outdated limitations). (ray-project#2022) bugfix: use array redis_primary_addr out of its scope (ray-project#2139) Fix infinite retry in Push function. (ray-project#2133) [JavaWorker] Changes to the directory under src for support java worker (ray-project#2093) Integrate credis with Ray & route task table entries into credis. (ray-project#1841)
Quick overview of this change.
How credis is loaded. Previously credis servers were separate from redis servers. With this change, the primary additionally loads credis'
libmaster.so
, and the (assumed to be only 1) redis shard loads credis'libmember.so
, acting as the singleton node in a 1-node chain.Changes to ray_redis_module.cc.
TableAdd
gets a new variant,ChainTableAdd
. I believe this should serve as a good example if others wish to port other commands in the future ;)Testing. All tests related to adding entries to legacy / raylet task tables in
client_test.cc
have been augmented to test with credis.To build and launch manually. To build,
RAY_USE_NEW_GCS=on pip install -e . --verbose
. To launch,RAY_USE_NEW_GCS=on ipython
.