-
Notifications
You must be signed in to change notification settings - Fork 6.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[GCS] impl RedisStoreClient for GCS Service #7675
Changes from 28 commits
64d5b6f
3ad0401
1c132cd
0d23783
a15d4d1
9bb6ee7
7057470
0a17db0
847becb
5b29e1b
ed9ba63
afea61c
1cb8a31
e5b50fc
c4b7f1a
279bd65
66ee193
4fc719a
a753510
3c69a85
654c3e9
4104fa5
ebde8c1
21afff7
66ad7f8
d137bdb
c27ff2d
9295492
83c1cd6
d05de8d
d4f2f23
64bd29a
21f74f3
5b76a92
07b67be
9dda67e
7ea2254
a1a1811
d9bc4b6
627f37c
94df0f4
164c79a
6e80858
86cef1d
4e93e10
f7c0ab3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -53,6 +53,17 @@ using SubscribeCallback = std::function<void(const ID &id, const Data &result)>; | |||||||
template <typename Data> | ||||||||
using ItemCallback = std::function<void(const Data &result)>; | ||||||||
|
||||||||
/// This callback is used to receive scan result from storage. | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||||||||
/// \param status Status indicates whether the scan was successful. | ||||||||
/// \param has_more Whether more data will be called back. | ||||||||
/// If the callback returns `has_more == true`, means the scan is not complete, | ||||||||
/// there has more data to be received. This callback will be called again. | ||||||||
/// If the callback returns `has_more == false`, means the scan is complete. | ||||||||
/// \param result The item returned by storage. | ||||||||
template <typename Data> | ||||||||
using SegmentedCallback = | ||||||||
std::function<void(Status status, bool has_more, const std::vector<Data> &result)>; | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about use has_next instead of has_more? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about use const Status &status instead of Status status? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's ok use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. RedisStoreClient will callback again when it receives more data from redis. It's a bit like the subscribe callback. |
||||||||
|
||||||||
} // namespace gcs | ||||||||
|
||||||||
} // namespace ray | ||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,164 @@ | ||
#include "ray/gcs/redis_client.h" | ||
|
||
#include <unistd.h> | ||
#include "ray/common/ray_config.h" | ||
#include "ray/gcs/redis_context.h" | ||
|
||
namespace ray { | ||
|
||
namespace gcs { | ||
|
||
static void GetRedisShards(redisContext *context, std::vector<std::string> *addresses, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add function comments. |
||
std::vector<int> *ports) { | ||
// Get the total number of Redis shards in the system. | ||
int num_attempts = 0; | ||
redisReply *reply = nullptr; | ||
while (num_attempts < RayConfig::instance().redis_db_connect_retries()) { | ||
// Try to read the number of Redis shards from the primary shard. If the | ||
// entry is present, exit. | ||
reply = reinterpret_cast<redisReply *>(redisCommand(context, "GET NumRedisShards")); | ||
if (reply->type != REDIS_REPLY_NIL) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will reply be nullptr when redis is inactive? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. reply will not be nullptr. |
||
break; | ||
} | ||
|
||
// Sleep for a little, and try again if the entry isn't there yet. | ||
freeReplyObject(reply); | ||
usleep(RayConfig::instance().redis_db_connect_wait_milliseconds() * 1000); | ||
num_attempts++; | ||
} | ||
RAY_CHECK(num_attempts < RayConfig::instance().redis_db_connect_retries()) | ||
<< "No entry found for NumRedisShards"; | ||
RAY_CHECK(reply->type == REDIS_REPLY_STRING) | ||
<< "Expected string, found Redis type " << reply->type << " for NumRedisShards"; | ||
int num_redis_shards = atoi(reply->str); | ||
RAY_CHECK(num_redis_shards >= 1) << "Expected at least one Redis shard, " | ||
<< "found " << num_redis_shards; | ||
freeReplyObject(reply); | ||
|
||
// Get the addresses of all of the Redis shards. | ||
num_attempts = 0; | ||
while (num_attempts < RayConfig::instance().redis_db_connect_retries()) { | ||
// Try to read the Redis shard locations from the primary shard. If we find | ||
// that all of them are present, exit. | ||
reply = | ||
reinterpret_cast<redisReply *>(redisCommand(context, "LRANGE RedisShards 0 -1")); | ||
if (static_cast<int>(reply->elements) == num_redis_shards) { | ||
break; | ||
} | ||
|
||
// Sleep for a little, and try again if not all Redis shard addresses have | ||
// been added yet. | ||
freeReplyObject(reply); | ||
usleep(RayConfig::instance().redis_db_connect_wait_milliseconds() * 1000); | ||
num_attempts++; | ||
} | ||
RAY_CHECK(num_attempts < RayConfig::instance().redis_db_connect_retries()) | ||
<< "Expected " << num_redis_shards << " Redis shard addresses, found " | ||
<< reply->elements; | ||
|
||
// Parse the Redis shard addresses. | ||
for (size_t i = 0; i < reply->elements; ++i) { | ||
// Parse the shard addresses and ports. | ||
RAY_CHECK(reply->element[i]->type == REDIS_REPLY_STRING); | ||
std::string addr; | ||
std::stringstream ss(reply->element[i]->str); | ||
getline(ss, addr, ':'); | ||
addresses->emplace_back(std::move(addr)); | ||
int port; | ||
ss >> port; | ||
ports->emplace_back(port); | ||
} | ||
freeReplyObject(reply); | ||
} | ||
|
||
RedisClient::RedisClient(const RedisClientOptions &options) : options_(options) {} | ||
|
||
Status RedisClient::Connect(boost::asio::io_service &io_service) { | ||
std::vector<boost::asio::io_service *> io_services; | ||
io_services.emplace_back(&io_service); | ||
return Connect(io_services); | ||
} | ||
|
||
Status RedisClient::Connect(std::vector<boost::asio::io_service *> io_services) { | ||
RAY_CHECK(!is_connected_); | ||
RAY_CHECK(!io_services.empty()); | ||
|
||
if (options_.server_ip_.empty()) { | ||
RAY_LOG(ERROR) << "Failed to connect, redis server address is empty."; | ||
return Status::Invalid("Redis server address is invalid!"); | ||
} | ||
|
||
primary_context_ = std::make_shared<RedisContext>(*io_services[0]); | ||
|
||
RAY_CHECK_OK(primary_context_->Connect(options_.server_ip_, options_.server_port_, | ||
/*sharding=*/true, | ||
/*password=*/options_.password_)); | ||
|
||
if (!options_.is_test_client_) { | ||
// Moving sharding into constructor defaultly means that sharding = true. | ||
// This design decision may worth a look. | ||
std::vector<std::string> addresses; | ||
std::vector<int> ports; | ||
GetRedisShards(primary_context_->sync_context(), &addresses, &ports); | ||
if (addresses.empty()) { | ||
RAY_CHECK(ports.empty()); | ||
addresses.push_back(options_.server_ip_); | ||
ports.push_back(options_.server_port_); | ||
} | ||
|
||
for (size_t i = 0; i < addresses.size(); ++i) { | ||
size_t io_service_index = (i + 1) % io_services.size(); | ||
boost::asio::io_service &io_service = *io_services[io_service_index]; | ||
// Populate shard_contexts. | ||
shard_contexts_.push_back(std::make_shared<RedisContext>(io_service)); | ||
RAY_CHECK_OK(shard_contexts_[i]->Connect(addresses[i], ports[i], /*sharding=*/true, | ||
/*password=*/options_.password_)); | ||
} | ||
} else { | ||
shard_contexts_.push_back(std::make_shared<RedisContext>(*io_services[0])); | ||
RAY_CHECK_OK(shard_contexts_[0]->Connect(options_.server_ip_, options_.server_port_, | ||
/*sharding=*/true, | ||
/*password=*/options_.password_)); | ||
} | ||
|
||
Attach(); | ||
|
||
is_connected_ = true; | ||
RAY_LOG(INFO) << "RedisClient connected."; | ||
|
||
return Status::OK(); | ||
} | ||
|
||
void RedisClient::Attach() { | ||
// Take care of sharding contexts. | ||
RAY_CHECK(shard_asio_async_clients_.empty()) << "Attach shall be called only once"; | ||
for (std::shared_ptr<RedisContext> context : shard_contexts_) { | ||
boost::asio::io_service &io_service = context->io_service(); | ||
shard_asio_async_clients_.emplace_back( | ||
new RedisAsioClient(io_service, context->async_context())); | ||
shard_asio_subscribe_clients_.emplace_back( | ||
new RedisAsioClient(io_service, context->subscribe_context())); | ||
} | ||
|
||
boost::asio::io_service &io_service = primary_context_->io_service(); | ||
asio_async_auxiliary_client_.reset( | ||
new RedisAsioClient(io_service, primary_context_->async_context())); | ||
asio_subscribe_auxiliary_client_.reset( | ||
new RedisAsioClient(io_service, primary_context_->subscribe_context())); | ||
} | ||
|
||
void RedisClient::Disconnect() { | ||
RAY_CHECK(is_connected_); | ||
is_connected_ = false; | ||
RAY_LOG(INFO) << "RedisClient disconnected."; | ||
} | ||
|
||
std::shared_ptr<RedisContext> RedisClient::GetShardContext(const std::string &shard_key) { | ||
static std::hash<std::string> hash; | ||
size_t index = hash(shard_key) % shard_contexts_.size(); | ||
return shard_contexts_[index]; | ||
} | ||
|
||
} // namespace gcs | ||
|
||
} // namespace ray |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
#ifndef RAY_GCS_REDIS_CLIENT_H | ||
#define RAY_GCS_REDIS_CLIENT_H | ||
|
||
#include <map> | ||
#include <string> | ||
|
||
#include "ray/common/status.h" | ||
#include "ray/gcs/asio.h" | ||
#include "ray/util/logging.h" | ||
|
||
namespace ray { | ||
|
||
namespace gcs { | ||
|
||
class RedisContext; | ||
|
||
class RedisClientOptions { | ||
public: | ||
RedisClientOptions(const std::string &ip, int port, const std::string &password, | ||
bool is_test_client = false) | ||
: server_ip_(ip), | ||
server_port_(port), | ||
password_(password), | ||
is_test_client_(is_test_client) {} | ||
|
||
// Redis server address | ||
std::string server_ip_; | ||
int server_port_; | ||
|
||
// Password of Redis. | ||
std::string password_; | ||
|
||
// Whether this client is used for tests. | ||
bool is_test_client_{false}; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. line 20 has set is_test_client_ There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will be used in default construction. |
||
}; | ||
|
||
/// \class RedisClient | ||
/// This class is used to send commands to Redis. | ||
class RedisClient { | ||
public: | ||
RedisClient(const RedisClientOptions &options); | ||
|
||
/// Connect to Redis. Non-thread safe. | ||
/// Call this function before calling other functions. | ||
/// | ||
/// \param io_service The event loop for this client. | ||
/// This io_service must be single-threaded. Because `RedisAsioClient` is | ||
/// non-thread safe. | ||
/// \return Status | ||
Status Connect(boost::asio::io_service &io_service); | ||
|
||
// TODO(micafan) Maybe it's not necessary to use multi threads. | ||
/// Connect to Redis. Non-thread safe. | ||
/// Call this function before calling other functions. | ||
/// | ||
/// \param io_services The event loops for this client. Each RedisContext bind to | ||
/// an event loop. Each io_service must be single-threaded. Because `RedisAsioClient` | ||
/// is non-thread safe. | ||
/// \return Status | ||
Status Connect(std::vector<boost::asio::io_service *> io_services); | ||
|
||
/// Disconnect with Redis. Non-thread safe. | ||
void Disconnect(); | ||
|
||
std::vector<std::shared_ptr<RedisContext>> GetShardContexts() { | ||
return shard_contexts_; | ||
} | ||
|
||
std::shared_ptr<RedisContext> GetShardContext(const std::string &shard_key); | ||
|
||
std::shared_ptr<RedisContext> GetPrimaryContext() { return primary_context_; } | ||
|
||
protected: | ||
/// Attach this client to an asio event loop. Note that only | ||
/// one event loop should be attached at a time. | ||
void Attach(); | ||
|
||
RedisClientOptions options_; | ||
|
||
/// Whether this client is connected to redis. | ||
bool is_connected_{false}; | ||
|
||
// The following contexts write to the data shard | ||
std::vector<std::shared_ptr<RedisContext>> shard_contexts_; | ||
std::vector<std::unique_ptr<RedisAsioClient>> shard_asio_async_clients_; | ||
std::vector<std::unique_ptr<RedisAsioClient>> shard_asio_subscribe_clients_; | ||
// The following context writes everything to the primary shard | ||
std::shared_ptr<RedisContext> primary_context_; | ||
std::unique_ptr<RedisAsioClient> asio_async_auxiliary_client_; | ||
std::unique_ptr<RedisAsioClient> asio_subscribe_auxiliary_client_; | ||
}; | ||
|
||
} // namespace gcs | ||
|
||
} // namespace ray | ||
|
||
#endif // RAY_GCS_REDIS_CLIENT_H |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if it's not used, let's add it in the next PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok.