-
Notifications
You must be signed in to change notification settings - Fork 6.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[GCS] impl RedisStoreClient for GCS Service #7675
Changes from 41 commits
64d5b6f
3ad0401
1c132cd
0d23783
a15d4d1
9bb6ee7
7057470
0a17db0
847becb
5b29e1b
ed9ba63
afea61c
1cb8a31
e5b50fc
c4b7f1a
279bd65
66ee193
4fc719a
a753510
3c69a85
654c3e9
4104fa5
ebde8c1
21afff7
66ad7f8
d137bdb
c27ff2d
9295492
83c1cd6
d05de8d
d4f2f23
64bd29a
21f74f3
5b76a92
07b67be
9dda67e
7ea2254
a1a1811
d9bc4b6
627f37c
94df0f4
164c79a
6e80858
86cef1d
4e93e10
f7c0ab3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -53,6 +53,17 @@ using SubscribeCallback = std::function<void(const ID &id, const Data &result)>; | |
template <typename Data> | ||
using ItemCallback = std::function<void(const Data &result)>; | ||
|
||
/// This callback is used to receive a large amount of results. | ||
/// \param status Status indicates whether the scan was successful. | ||
/// \param has_more Whether more data will be called back. | ||
/// If the callback returns `has_more == true`, means the scan is not complete, | ||
/// there has more data to be received. This callback will be called again. | ||
/// If the callback returns `has_more == false`, means the scan is complete. | ||
/// \param result The item returned by storage. | ||
template <typename Data> | ||
using SegmentedCallback = | ||
std::function<void(Status status, bool has_more, const std::vector<Data> &result)>; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about use const Status &status instead of Status status? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's ok use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. RedisStoreClient will callback again when it receives more data from redis. It's a bit like the subscribe callback. |
||
|
||
} // namespace gcs | ||
|
||
} // namespace ray | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,178 @@ | ||
// Copyright 2017 The Ray Authors. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
#include "ray/gcs/redis_client.h" | ||
|
||
#include <unistd.h> | ||
#include "ray/common/ray_config.h" | ||
#include "ray/gcs/redis_context.h" | ||
|
||
namespace ray { | ||
|
||
namespace gcs { | ||
|
||
static void GetRedisShards(redisContext *context, std::vector<std::string> *addresses, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add function comments. |
||
std::vector<int> *ports) { | ||
// Get the total number of Redis shards in the system. | ||
int num_attempts = 0; | ||
redisReply *reply = nullptr; | ||
while (num_attempts < RayConfig::instance().redis_db_connect_retries()) { | ||
// Try to read the number of Redis shards from the primary shard. If the | ||
// entry is present, exit. | ||
reply = reinterpret_cast<redisReply *>(redisCommand(context, "GET NumRedisShards")); | ||
if (reply->type != REDIS_REPLY_NIL) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will reply be nullptr when redis is inactive? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. reply will not be nullptr. |
||
break; | ||
} | ||
|
||
// Sleep for a little, and try again if the entry isn't there yet. | ||
freeReplyObject(reply); | ||
usleep(RayConfig::instance().redis_db_connect_wait_milliseconds() * 1000); | ||
num_attempts++; | ||
} | ||
RAY_CHECK(num_attempts < RayConfig::instance().redis_db_connect_retries()) | ||
<< "No entry found for NumRedisShards"; | ||
RAY_CHECK(reply->type == REDIS_REPLY_STRING) | ||
<< "Expected string, found Redis type " << reply->type << " for NumRedisShards"; | ||
int num_redis_shards = atoi(reply->str); | ||
RAY_CHECK(num_redis_shards >= 1) << "Expected at least one Redis shard, " | ||
<< "found " << num_redis_shards; | ||
freeReplyObject(reply); | ||
|
||
// Get the addresses of all of the Redis shards. | ||
num_attempts = 0; | ||
while (num_attempts < RayConfig::instance().redis_db_connect_retries()) { | ||
// Try to read the Redis shard locations from the primary shard. If we find | ||
// that all of them are present, exit. | ||
reply = | ||
reinterpret_cast<redisReply *>(redisCommand(context, "LRANGE RedisShards 0 -1")); | ||
if (static_cast<int>(reply->elements) == num_redis_shards) { | ||
break; | ||
} | ||
|
||
// Sleep for a little, and try again if not all Redis shard addresses have | ||
// been added yet. | ||
freeReplyObject(reply); | ||
usleep(RayConfig::instance().redis_db_connect_wait_milliseconds() * 1000); | ||
num_attempts++; | ||
} | ||
RAY_CHECK(num_attempts < RayConfig::instance().redis_db_connect_retries()) | ||
<< "Expected " << num_redis_shards << " Redis shard addresses, found " | ||
<< reply->elements; | ||
|
||
// Parse the Redis shard addresses. | ||
for (size_t i = 0; i < reply->elements; ++i) { | ||
// Parse the shard addresses and ports. | ||
RAY_CHECK(reply->element[i]->type == REDIS_REPLY_STRING); | ||
std::string addr; | ||
std::stringstream ss(reply->element[i]->str); | ||
getline(ss, addr, ':'); | ||
addresses->emplace_back(std::move(addr)); | ||
int port; | ||
ss >> port; | ||
ports->emplace_back(port); | ||
} | ||
freeReplyObject(reply); | ||
} | ||
|
||
RedisClient::RedisClient(const RedisClientOptions &options) : options_(options) {} | ||
|
||
Status RedisClient::Connect(boost::asio::io_service &io_service) { | ||
std::vector<boost::asio::io_service *> io_services; | ||
io_services.emplace_back(&io_service); | ||
return Connect(io_services); | ||
} | ||
|
||
Status RedisClient::Connect(std::vector<boost::asio::io_service *> io_services) { | ||
RAY_CHECK(!is_connected_); | ||
RAY_CHECK(!io_services.empty()); | ||
|
||
if (options_.server_ip_.empty()) { | ||
RAY_LOG(ERROR) << "Failed to connect, redis server address is empty."; | ||
return Status::Invalid("Redis server address is invalid!"); | ||
} | ||
|
||
primary_context_ = std::make_shared<RedisContext>(*io_services[0]); | ||
|
||
RAY_CHECK_OK(primary_context_->Connect(options_.server_ip_, options_.server_port_, | ||
/*sharding=*/true, | ||
/*password=*/options_.password_)); | ||
|
||
if (!options_.is_test_client_) { | ||
// Moving sharding into constructor defaultly means that sharding = true. | ||
// This design decision may worth a look. | ||
std::vector<std::string> addresses; | ||
std::vector<int> ports; | ||
GetRedisShards(primary_context_->sync_context(), &addresses, &ports); | ||
if (addresses.empty()) { | ||
RAY_CHECK(ports.empty()); | ||
addresses.push_back(options_.server_ip_); | ||
ports.push_back(options_.server_port_); | ||
} | ||
|
||
for (size_t i = 0; i < addresses.size(); ++i) { | ||
size_t io_service_index = (i + 1) % io_services.size(); | ||
boost::asio::io_service &io_service = *io_services[io_service_index]; | ||
// Populate shard_contexts. | ||
shard_contexts_.push_back(std::make_shared<RedisContext>(io_service)); | ||
RAY_CHECK_OK(shard_contexts_[i]->Connect(addresses[i], ports[i], /*sharding=*/true, | ||
/*password=*/options_.password_)); | ||
} | ||
} else { | ||
shard_contexts_.push_back(std::make_shared<RedisContext>(*io_services[0])); | ||
RAY_CHECK_OK(shard_contexts_[0]->Connect(options_.server_ip_, options_.server_port_, | ||
/*sharding=*/true, | ||
/*password=*/options_.password_)); | ||
} | ||
|
||
Attach(); | ||
|
||
is_connected_ = true; | ||
RAY_LOG(INFO) << "RedisClient connected."; | ||
|
||
return Status::OK(); | ||
} | ||
|
||
void RedisClient::Attach() { | ||
// Take care of sharding contexts. | ||
RAY_CHECK(shard_asio_async_clients_.empty()) << "Attach shall be called only once"; | ||
for (std::shared_ptr<RedisContext> context : shard_contexts_) { | ||
boost::asio::io_service &io_service = context->io_service(); | ||
shard_asio_async_clients_.emplace_back( | ||
new RedisAsioClient(io_service, context->async_context())); | ||
shard_asio_subscribe_clients_.emplace_back( | ||
new RedisAsioClient(io_service, context->subscribe_context())); | ||
} | ||
|
||
boost::asio::io_service &io_service = primary_context_->io_service(); | ||
asio_async_auxiliary_client_.reset( | ||
new RedisAsioClient(io_service, primary_context_->async_context())); | ||
asio_subscribe_auxiliary_client_.reset( | ||
new RedisAsioClient(io_service, primary_context_->subscribe_context())); | ||
} | ||
|
||
void RedisClient::Disconnect() { | ||
RAY_CHECK(is_connected_); | ||
is_connected_ = false; | ||
RAY_LOG(INFO) << "RedisClient disconnected."; | ||
} | ||
|
||
std::shared_ptr<RedisContext> RedisClient::GetShardContext(const std::string &shard_key) { | ||
static std::hash<std::string> hash; | ||
size_t index = hash(shard_key) % shard_contexts_.size(); | ||
return shard_contexts_[index]; | ||
} | ||
|
||
} // namespace gcs | ||
|
||
} // namespace ray |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about use has_next instead of has_more?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
has_more
is more specific thanhas_next
, right?