Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GCS] impl RedisStoreClient for GCS Service #7675

Merged
merged 46 commits into from
Apr 1, 2020
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
64d5b6f
add StoreClient to GCS
micafan Mar 10, 2020
3ad0401
add StoreClient to GCS
micafan Mar 10, 2020
1c132cd
fix lint
micafan Mar 10, 2020
0d23783
update ScanCallback
micafan Mar 10, 2020
a15d4d1
fix lint
micafan Mar 10, 2020
9bb6ee7
add RedisClient
micafan Mar 13, 2020
7057470
finish AsyncPut
micafan Mar 13, 2020
0a17db0
update delete method
micafan Mar 13, 2020
847becb
add RedisRangeOpExecutor for complex operations
micafan Mar 16, 2020
5b29e1b
use primary shard only
micafan Mar 17, 2020
ed9ba63
CallbackReply support parse scan array
micafan Mar 17, 2020
afea61c
update IOServicePool
micafan Mar 17, 2020
1cb8a31
add build target redis_client
micafan Mar 17, 2020
e5b50fc
add build target redis_store_client
micafan Mar 17, 2020
c4b7f1a
fix lint
micafan Mar 17, 2020
279bd65
opt code: rename var, add if logic
micafan Mar 17, 2020
66ee193
add UT for RedisStoreClient
micafan Mar 18, 2020
4fc719a
fix bug(pass test: Get, Put, GetAll)
micafan Mar 18, 2020
a753510
fix UT
micafan Mar 18, 2020
3c69a85
update comments
micafan Mar 18, 2020
654c3e9
remove ut dependency to redis module;
micafan Mar 19, 2020
4104fa5
fix bugs of deconstruct
micafan Mar 20, 2020
ebde8c1
keep RedisStoreClient simple: only support Get/Put/Delete
micafan Mar 20, 2020
21afff7
Merge branch 'master' into redis_impl_kv_store_step_one
micafan Mar 24, 2020
66ad7f8
change StoreClient to template class
micafan Mar 25, 2020
d137bdb
update comments
micafan Mar 25, 2020
c27ff2d
fix comments
micafan Mar 25, 2020
9295492
fix comments
micafan Mar 25, 2020
83c1cd6
add license
micafan Mar 25, 2020
d05de8d
rm interface AsyncGetByIndex from StoreClient
micafan Mar 25, 2020
d4f2f23
rm useless config
micafan Mar 25, 2020
64bd29a
fix comments
micafan Mar 25, 2020
21f74f3
Merge branch 'master' into redis_impl_kv_store_step_one
micafan Mar 26, 2020
5b76a92
make RedisStoreClient as template class
micafan Mar 26, 2020
07b67be
use unique_ptr in IOServicePool
micafan Mar 26, 2020
9dda67e
update interface of StoreClient: pb instead of template
micafan Mar 30, 2020
7ea2254
fix lint
micafan Mar 30, 2020
a1a1811
change StoreClient to template class
micafan Mar 30, 2020
d9bc4b6
fix lint
micafan Mar 30, 2020
627f37c
update comments
micafan Mar 30, 2020
94df0f4
rm useless head file
micafan Mar 30, 2020
164c79a
refine comment
raulchen Mar 30, 2020
6e80858
add define of ActorStoreTable and RedisActorStoreTable
micafan Mar 30, 2020
86cef1d
Merge branch 'redis_impl_kv_store_step_one_a' into redis_impl_kv_stor…
micafan Mar 30, 2020
4e93e10
rename SecondaryKey to IndexKey
micafan Mar 30, 2020
f7c0ab3
fix bazel format
micafan Mar 31, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -896,6 +896,73 @@ alias(
actual = "@com_github_antirez_redis//:hiredis",
)

cc_library(
name = "redis_client",
srcs = [
"src/ray/gcs/asio.cc",
"src/ray/gcs/redis_async_context.cc",
"src/ray/gcs/redis_client.cc",
"src/ray/gcs/redis_context.cc",
],
hdrs = [
"src/ray/gcs/asio.h",
"src/ray/gcs/redis_async_context.h",
"src/ray/gcs/redis_client.h",
"src/ray/gcs/redis_context.h",
],
copts = COPTS,
deps = [
":hiredis",
":ray_common",
":ray_util",
":stats_lib",
"@boost//:asio",
],
)

cc_library(
name = "redis_store_client",
srcs = [
"src/ray/gcs/store_client/redis_store_client.cc",
],
hdrs = [
"src/ray/gcs/callback.h",
"src/ray/gcs/store_client/store_client.h",
"src/ray/gcs/store_client/redis_store_client.h",
],
copts = COPTS,
deps = [
"redis_client",
],
)

cc_library(
name = "store_client_test_lib",
hdrs = [
"src/ray/gcs/store_client/test/store_client_test_base.h",
],
copts = COPTS,
deps = [
"redis_store_client",
],
)

cc_test(
name = "redis_store_client_test",
srcs = ["src/ray/gcs/store_client/test/redis_store_client_test.cc"],
args = ["$(location redis-server) $(location redis-cli)"],
copts = COPTS,
data = [
"//:redis-cli",
"//:redis-server",
],
deps = [
":redis_store_client",
":store_client_test_lib",
"@com_google_googletest//:gtest_main",
],
)

cc_library(
name = "gcs",
srcs = glob(
Expand Down
3 changes: 3 additions & 0 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,9 @@ RAY_CONFIG(int64_t, internal_gcs_service_connect_wait_milliseconds, 100)
/// When this happens, gcs server will kill itself.
RAY_CONFIG(int64_t, gcs_redis_heartbeat_interval_milliseconds, 100)

/// Maximum number of keys in one batch to scan from redis.
RAY_CONFIG(uint32_t, gcs_redis_scan_batch_size, 1000)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it's not used, let's add it in the next PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok.


/// Maximum number of times to retry putting an object when the plasma store is full.
/// Can be set to -1 to enable unlimited retries.
RAY_CONFIG(int32_t, object_store_full_max_retries, 5)
Expand Down
12 changes: 9 additions & 3 deletions src/ray/common/test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,15 @@ void RedisServiceManagerForTest::SetUpTestCase() {
// Use random port to avoid port conflicts between UTs.
REDIS_SERVER_PORT = random_gen(gen);

std::string start_redis_command =
REDIS_SERVER_EXEC_PATH + " --loglevel warning --loadmodule " +
REDIS_MODULE_LIBRARY_PATH + " --port " + std::to_string(REDIS_SERVER_PORT) + " &";
std::string load_module_command;
if (!REDIS_MODULE_LIBRARY_PATH.empty()) {
// Fill load module command.
load_module_command = "--loadmodule " + REDIS_MODULE_LIBRARY_PATH;
}

std::string start_redis_command = REDIS_SERVER_EXEC_PATH + " --loglevel warning " +
load_module_command + " --port " +
std::to_string(REDIS_SERVER_PORT) + " &";
RAY_LOG(INFO) << "Start redis command is: " << start_redis_command;
RAY_CHECK(system(start_redis_command.c_str()) == 0);
usleep(200 * 1000);
Expand Down
11 changes: 11 additions & 0 deletions src/ray/gcs/callback.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,17 @@ using SubscribeCallback = std::function<void(const ID &id, const Data &result)>;
template <typename Data>
using ItemCallback = std::function<void(const Data &result)>;

/// This callback is used to receive scan result from storage.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// This callback is used to receive scan result from storage.
/// This callback is used to receive a large amount of results.
/// It will be called one or multiple times, with one small chunk of results each time, until `has_more` is false.

scan seems to be unclear and specific to redis. What about naming it ChunkedCallback.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

/// \param status Status indicates whether the scan was successful.
/// \param has_more Whether more data will be called back.
/// If the callback returns `has_more == true`, means the scan is not complete,
/// there has more data to be received. This callback will be called again.
/// If the callback returns `has_more == false`, means the scan is complete.
/// \param result The item returned by storage.
template <typename Data>
using SegmentedCallback =
std::function<void(Status status, bool has_more, const std::vector<Data> &result)>;
Copy link
Contributor

@ffbin ffbin Mar 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about use has_next instead of has_more?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

has_more is more specific than has_next, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about use const Status &status instead of Status status?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's ok use Status instead of const &. We can use move to avoid copy if necessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it has_more == true, how do I query the the remaining results?
I think maybe we can return a iterator object which offers HasNext and GetNext methods.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RedisStoreClient will callback again when it receives more data from redis. It's a bit like the subscribe callback.
I did think to return a iterator in callback. But that will make the api more complex to use.


} // namespace gcs

} // namespace ray
Expand Down
164 changes: 164 additions & 0 deletions src/ray/gcs/redis_client.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
#include "ray/gcs/redis_client.h"

#include <unistd.h>
#include "ray/common/ray_config.h"
#include "ray/gcs/redis_context.h"

namespace ray {

namespace gcs {

static void GetRedisShards(redisContext *context, std::vector<std::string> *addresses,
Copy link
Contributor

@ffbin ffbin Mar 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add function comments.

std::vector<int> *ports) {
// Get the total number of Redis shards in the system.
int num_attempts = 0;
redisReply *reply = nullptr;
while (num_attempts < RayConfig::instance().redis_db_connect_retries()) {
// Try to read the number of Redis shards from the primary shard. If the
// entry is present, exit.
reply = reinterpret_cast<redisReply *>(redisCommand(context, "GET NumRedisShards"));
if (reply->type != REDIS_REPLY_NIL) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will reply be nullptr when redis is inactive?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reply will not be nullptr.

break;
}

// Sleep for a little, and try again if the entry isn't there yet.
freeReplyObject(reply);
usleep(RayConfig::instance().redis_db_connect_wait_milliseconds() * 1000);
num_attempts++;
}
RAY_CHECK(num_attempts < RayConfig::instance().redis_db_connect_retries())
<< "No entry found for NumRedisShards";
RAY_CHECK(reply->type == REDIS_REPLY_STRING)
<< "Expected string, found Redis type " << reply->type << " for NumRedisShards";
int num_redis_shards = atoi(reply->str);
RAY_CHECK(num_redis_shards >= 1) << "Expected at least one Redis shard, "
<< "found " << num_redis_shards;
freeReplyObject(reply);

// Get the addresses of all of the Redis shards.
num_attempts = 0;
while (num_attempts < RayConfig::instance().redis_db_connect_retries()) {
// Try to read the Redis shard locations from the primary shard. If we find
// that all of them are present, exit.
reply =
reinterpret_cast<redisReply *>(redisCommand(context, "LRANGE RedisShards 0 -1"));
if (static_cast<int>(reply->elements) == num_redis_shards) {
break;
}

// Sleep for a little, and try again if not all Redis shard addresses have
// been added yet.
freeReplyObject(reply);
usleep(RayConfig::instance().redis_db_connect_wait_milliseconds() * 1000);
num_attempts++;
}
RAY_CHECK(num_attempts < RayConfig::instance().redis_db_connect_retries())
<< "Expected " << num_redis_shards << " Redis shard addresses, found "
<< reply->elements;

// Parse the Redis shard addresses.
for (size_t i = 0; i < reply->elements; ++i) {
// Parse the shard addresses and ports.
RAY_CHECK(reply->element[i]->type == REDIS_REPLY_STRING);
std::string addr;
std::stringstream ss(reply->element[i]->str);
getline(ss, addr, ':');
addresses->emplace_back(std::move(addr));
int port;
ss >> port;
ports->emplace_back(port);
}
freeReplyObject(reply);
}

RedisClient::RedisClient(const RedisClientOptions &options) : options_(options) {}

Status RedisClient::Connect(boost::asio::io_service &io_service) {
std::vector<boost::asio::io_service *> io_services;
io_services.emplace_back(&io_service);
return Connect(io_services);
}

Status RedisClient::Connect(std::vector<boost::asio::io_service *> io_services) {
RAY_CHECK(!is_connected_);
RAY_CHECK(!io_services.empty());

if (options_.server_ip_.empty()) {
RAY_LOG(ERROR) << "Failed to connect, redis server address is empty.";
return Status::Invalid("Redis server address is invalid!");
}

primary_context_ = std::make_shared<RedisContext>(*io_services[0]);

RAY_CHECK_OK(primary_context_->Connect(options_.server_ip_, options_.server_port_,
/*sharding=*/true,
/*password=*/options_.password_));

if (!options_.is_test_client_) {
// Moving sharding into constructor defaultly means that sharding = true.
// This design decision may worth a look.
std::vector<std::string> addresses;
std::vector<int> ports;
GetRedisShards(primary_context_->sync_context(), &addresses, &ports);
if (addresses.empty()) {
RAY_CHECK(ports.empty());
addresses.push_back(options_.server_ip_);
ports.push_back(options_.server_port_);
}

for (size_t i = 0; i < addresses.size(); ++i) {
size_t io_service_index = (i + 1) % io_services.size();
boost::asio::io_service &io_service = *io_services[io_service_index];
// Populate shard_contexts.
shard_contexts_.push_back(std::make_shared<RedisContext>(io_service));
RAY_CHECK_OK(shard_contexts_[i]->Connect(addresses[i], ports[i], /*sharding=*/true,
/*password=*/options_.password_));
}
} else {
shard_contexts_.push_back(std::make_shared<RedisContext>(*io_services[0]));
RAY_CHECK_OK(shard_contexts_[0]->Connect(options_.server_ip_, options_.server_port_,
/*sharding=*/true,
/*password=*/options_.password_));
}

Attach();

is_connected_ = true;
RAY_LOG(INFO) << "RedisClient connected.";

return Status::OK();
}

void RedisClient::Attach() {
// Take care of sharding contexts.
RAY_CHECK(shard_asio_async_clients_.empty()) << "Attach shall be called only once";
for (std::shared_ptr<RedisContext> context : shard_contexts_) {
boost::asio::io_service &io_service = context->io_service();
shard_asio_async_clients_.emplace_back(
new RedisAsioClient(io_service, context->async_context()));
shard_asio_subscribe_clients_.emplace_back(
new RedisAsioClient(io_service, context->subscribe_context()));
}

boost::asio::io_service &io_service = primary_context_->io_service();
asio_async_auxiliary_client_.reset(
new RedisAsioClient(io_service, primary_context_->async_context()));
asio_subscribe_auxiliary_client_.reset(
new RedisAsioClient(io_service, primary_context_->subscribe_context()));
}

void RedisClient::Disconnect() {
RAY_CHECK(is_connected_);
is_connected_ = false;
RAY_LOG(INFO) << "RedisClient disconnected.";
}

std::shared_ptr<RedisContext> RedisClient::GetShardContext(const std::string &shard_key) {
static std::hash<std::string> hash;
size_t index = hash(shard_key) % shard_contexts_.size();
return shard_contexts_[index];
}

} // namespace gcs

} // namespace ray
97 changes: 97 additions & 0 deletions src/ray/gcs/redis_client.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#ifndef RAY_GCS_REDIS_CLIENT_H
#define RAY_GCS_REDIS_CLIENT_H

#include <map>
#include <string>

#include "ray/common/status.h"
#include "ray/gcs/asio.h"
#include "ray/util/logging.h"

namespace ray {

namespace gcs {

class RedisContext;

class RedisClientOptions {
public:
RedisClientOptions(const std::string &ip, int port, const std::string &password,
bool is_test_client = false)
: server_ip_(ip),
server_port_(port),
password_(password),
is_test_client_(is_test_client) {}

// Redis server address
std::string server_ip_;
int server_port_;

// Password of Redis.
std::string password_;

// Whether this client is used for tests.
bool is_test_client_{false};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line 20 has set is_test_client_

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be used in default construction.

};

/// \class RedisClient
/// This class is used to send commands to Redis.
class RedisClient {
public:
RedisClient(const RedisClientOptions &options);

/// Connect to Redis. Non-thread safe.
/// Call this function before calling other functions.
///
/// \param io_service The event loop for this client.
/// This io_service must be single-threaded. Because `RedisAsioClient` is
/// non-thread safe.
/// \return Status
Status Connect(boost::asio::io_service &io_service);

// TODO(micafan) Maybe it's not necessary to use multi threads.
/// Connect to Redis. Non-thread safe.
/// Call this function before calling other functions.
///
/// \param io_services The event loops for this client. Each RedisContext bind to
/// an event loop. Each io_service must be single-threaded. Because `RedisAsioClient`
/// is non-thread safe.
/// \return Status
Status Connect(std::vector<boost::asio::io_service *> io_services);

/// Disconnect with Redis. Non-thread safe.
void Disconnect();

std::vector<std::shared_ptr<RedisContext>> GetShardContexts() {
return shard_contexts_;
}

std::shared_ptr<RedisContext> GetShardContext(const std::string &shard_key);

std::shared_ptr<RedisContext> GetPrimaryContext() { return primary_context_; }

protected:
/// Attach this client to an asio event loop. Note that only
/// one event loop should be attached at a time.
void Attach();

RedisClientOptions options_;

/// Whether this client is connected to redis.
bool is_connected_{false};

// The following contexts write to the data shard
std::vector<std::shared_ptr<RedisContext>> shard_contexts_;
std::vector<std::unique_ptr<RedisAsioClient>> shard_asio_async_clients_;
std::vector<std::unique_ptr<RedisAsioClient>> shard_asio_subscribe_clients_;
// The following context writes everything to the primary shard
std::shared_ptr<RedisContext> primary_context_;
std::unique_ptr<RedisAsioClient> asio_async_auxiliary_client_;
std::unique_ptr<RedisAsioClient> asio_subscribe_auxiliary_client_;
};

} // namespace gcs

} // namespace ray

#endif // RAY_GCS_REDIS_CLIENT_H
Loading