Skip to content

Commit

Permalink
sync write internal config in gcs (ray-project#13197)
Browse files Browse the repository at this point in the history
  • Loading branch information
wumuzi520 authored Jan 17, 2021
1 parent 8c8af26 commit 2cd51ce
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 27 deletions.
53 changes: 43 additions & 10 deletions src/ray/gcs/gcs_server/gcs_server_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
#include "gflags/gflags.h"
#include "ray/common/ray_config.h"
#include "ray/gcs/gcs_server/gcs_server.h"
#include "ray/gcs/store_client/redis_store_client.h"
#include "ray/stats/stats.h"
#include "ray/util/util.h"
#include "src/ray/protobuf/gcs_service.pb.h"

DEFINE_string(redis_address, "", "The ip address of redis.");
DEFINE_int32(redis_port, -1, "The port of redis.");
Expand Down Expand Up @@ -46,19 +48,50 @@ int main(int argc, char *argv[]) {
const std::string node_ip_address = FLAGS_node_ip_address;
gflags::ShutDownCommandLineFlags();

std::unordered_map<std::string, std::string> config_map;
auto promise =
std::make_shared<std::promise<std::unordered_map<std::string, std::string>>>();
std::thread([=] {
boost::asio::io_service service;

// Parse the configuration list.
std::istringstream config_string(config_list);
std::string config_name;
std::string config_value;
// Init backend client.
ray::gcs::RedisClientOptions redis_client_options(redis_address, redis_port,
redis_password);
auto redis_client = std::make_shared<ray::gcs::RedisClient>(redis_client_options);
auto status = redis_client->Connect(service);
RAY_CHECK(status.ok()) << "Failed to init redis gcs client as " << status;

while (std::getline(config_string, config_name, ',')) {
RAY_CHECK(std::getline(config_string, config_value, ';'));
config_map[config_name] = config_value;
}
// Init storage.
auto storage = std::make_shared<ray::gcs::RedisGcsTableStorage>(redis_client);

RayConfig::instance().initialize(config_map);
// Parse the configuration list.
std::unordered_map<std::string, std::string> config;

std::istringstream config_string(config_list);
std::string config_name;
std::string config_value;

while (std::getline(config_string, config_name, ',')) {
RAY_CHECK(std::getline(config_string, config_value, ';'));
config[config_name] = config_value;
}

// The internal_config is only set on the gcs--other nodes get it from GCS.
ray::rpc::SetInternalConfigRequest request;
request.mutable_config()->mutable_config()->insert(config.begin(), config.end());

auto on_done = [promise, &service, &config](const ray::Status &status) {
promise->set_value(config);
service.stop();
};
RAY_CHECK_OK(storage->InternalConfigTable().Put(ray::UniqueID::Nil(),
request.config(), on_done));
boost::asio::io_service::work work(service);
service.run();
})
.detach();
auto config = promise->get_future().get();

RayConfig::instance().initialize(config);
const ray::stats::TagsType global_tags = {
{ray::stats::ComponentKey, "gcs_server"},
{ray::stats::VersionKey, "2.0.0.dev0"},
Expand Down
17 changes: 0 additions & 17 deletions src/ray/raylet/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,13 @@ int main(int argc, char *argv[]) {
const int maximum_startup_concurrency =
static_cast<int>(FLAGS_maximum_startup_concurrency);
const std::string static_resource_list = FLAGS_static_resource_list;
const std::string config_list = FLAGS_config_list;
const std::string python_worker_command = FLAGS_python_worker_command;
const std::string java_worker_command = FLAGS_java_worker_command;
const std::string agent_command = FLAGS_agent_command;
const std::string cpp_worker_command = FLAGS_cpp_worker_command;
const std::string redis_password = FLAGS_redis_password;
const std::string temp_dir = FLAGS_temp_dir;
const std::string session_dir = FLAGS_session_dir;
const bool head_node = FLAGS_head_node;
const int64_t object_store_memory = FLAGS_object_store_memory;
const std::string plasma_directory = FLAGS_plasma_directory;
const bool huge_pages = FLAGS_huge_pages;
Expand All @@ -117,21 +115,6 @@ int main(int argc, char *argv[]) {

RAY_CHECK_OK(gcs_client->Connect(main_service));

// The system_config is only set on the head node--other nodes get it from GCS.
if (head_node) {
// Parse the configuration list.
std::istringstream config_string(config_list);
std::string config_name;
std::string config_value;

while (std::getline(config_string, config_name, ',')) {
RAY_CHECK(std::getline(config_string, config_value, ';'));
// TODO(rkn): The line below could throw an exception. What should we do about this?
raylet_config[config_name] = config_value;
}
RAY_CHECK_OK(gcs_client->Nodes().AsyncSetInternalConfig(raylet_config));
}

std::unique_ptr<ray::raylet::Raylet> server(nullptr);

RAY_CHECK_OK(gcs_client->Nodes().AsyncGetInternalConfig(
Expand Down

0 comments on commit 2cd51ce

Please sign in to comment.