Skip to content

Gcs Asio integration #1633

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Mar 4, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/global_scheduler/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ include(${CMAKE_CURRENT_LIST_DIR}/../common/cmake/Common.cmake)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wall")

add_executable(global_scheduler global_scheduler.cc global_scheduler_algorithm.cc)
target_link_libraries(global_scheduler common ${HIREDIS_LIB} ray_static ${PLASMA_STATIC_LIB} ${ARROW_STATIC_LIB})
target_link_libraries(global_scheduler common ${HIREDIS_LIB} ray_static ${PLASMA_STATIC_LIB} ${ARROW_STATIC_LIB} ${Boost_SYSTEM_LIBRARY})
8 changes: 4 additions & 4 deletions src/local_scheduler/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,18 @@ add_library(local_scheduler_client STATIC local_scheduler_client.cc)
add_dependencies(local_scheduler_client gen_local_scheduler_fbs)

if(APPLE)
target_link_libraries(local_scheduler_library "-undefined dynamic_lookup" local_scheduler_client common ray_static ${PLASMA_STATIC_LIB} ${ARROW_STATIC_LIB})
target_link_libraries(local_scheduler_library "-undefined dynamic_lookup" local_scheduler_client common ray_static ${PLASMA_STATIC_LIB} ${ARROW_STATIC_LIB} ${Boost_SYSTEM_LIBRARY})
else(APPLE)
target_link_libraries(local_scheduler_library local_scheduler_client common ray_static ${PLASMA_STATIC_LIB} ${ARROW_STATIC_LIB})
target_link_libraries(local_scheduler_library local_scheduler_client common ray_static ${PLASMA_STATIC_LIB} ${ARROW_STATIC_LIB} ${Boost_SYSTEM_LIBRARY})
endif(APPLE)

add_dependencies(local_scheduler_library gen_local_scheduler_fbs)

add_executable(local_scheduler local_scheduler.cc local_scheduler_algorithm.cc)
target_link_libraries(local_scheduler local_scheduler_client common ${HIREDIS_LIB} ${PLASMA_STATIC_LIB} ray_static ${ARROW_STATIC_LIB} -lpthread)
target_link_libraries(local_scheduler local_scheduler_client common ${HIREDIS_LIB} ${PLASMA_STATIC_LIB} ray_static ${ARROW_STATIC_LIB} -lpthread ${Boost_SYSTEM_LIBRARY})

add_executable(local_scheduler_tests test/local_scheduler_tests.cc local_scheduler.cc local_scheduler_algorithm.cc)
target_link_libraries(local_scheduler_tests local_scheduler_client common ${HIREDIS_LIB} ${PLASMA_STATIC_LIB} ray_static ${ARROW_STATIC_LIB} -lpthread)
target_link_libraries(local_scheduler_tests local_scheduler_client common ${HIREDIS_LIB} ${PLASMA_STATIC_LIB} ray_static ${ARROW_STATIC_LIB} -lpthread ${Boost_SYSTEM_LIBRARY})
target_compile_options(local_scheduler_tests PUBLIC "-DLOCAL_SCHEDULER_TEST")

install(TARGETS local_scheduler_library DESTINATION ${CMAKE_SOURCE_DIR}/local_scheduler)
7 changes: 4 additions & 3 deletions src/plasma/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ project(plasma)
# Recursively include common
include(${CMAKE_CURRENT_LIST_DIR}/../common/cmake/Common.cmake)

set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} --std=c99 -D_XOPEN_SOURCE=500 -D_POSIX_C_SOURCE=200809L -O3")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} --std=c++11 -D_XOPEN_SOURCE=500 -D_POSIX_C_SOURCE=200809L -O3 -Werror -Wall")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What was the reason for removing these? Just wondering!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was not compatible with asio (errors in the include file popped up). I'm not sure why we needed it in the first place, but things seem to be working without it. We should keep an eye on it though in case any problems come up.

set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} --std=c99 -O3")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} --std=c++11 -O3 -Werror -Wall")

if(UNIX AND NOT APPLE)
link_libraries(rt)
Expand All @@ -22,7 +22,8 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fPIC")
add_executable(plasma_manager
plasma_manager.cc)

target_link_libraries(plasma_manager common ${PLASMA_STATIC_LIB} ray_static ${ARROW_STATIC_LIB} -lpthread)
target_link_libraries(plasma_manager common ${PLASMA_STATIC_LIB} ray_static ${ARROW_STATIC_LIB} -lpthread ${Boost_SYSTEM_LIBRARY})

define_test(client_tests "")
define_test(manager_tests "" plasma_manager.cc)
target_link_libraries(manager_tests ${Boost_SYSTEM_LIBRARY})
2 changes: 1 addition & 1 deletion src/ray/gcs/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ add_custom_command(

add_custom_target(gen_gcs_fbs DEPENDS ${GCS_FBS_OUTPUT_FILES})

ADD_RAY_TEST(client_test STATIC_LINK_LIBS ray_static ${PLASMA_STATIC_LIB} ${ARROW_STATIC_LIB} gtest gtest_main pthread)
ADD_RAY_TEST(client_test STATIC_LINK_LIBS ray_static ${PLASMA_STATIC_LIB} ${ARROW_STATIC_LIB} gtest gtest_main pthread ${Boost_SYSTEM_LIBRARY})
ADD_RAY_TEST(asio_test STATIC_LINK_LIBS ray_static ${PLASMA_STATIC_LIB} ${ARROW_STATIC_LIB} gtest gtest_main pthread ${Boost_SYSTEM_LIBRARY})

install(FILES
Expand Down
6 changes: 6 additions & 0 deletions src/ray/gcs/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ Status Attach(plasma::EventLoop &event_loop) {
return Status::OK();
}

Status AsyncGcsClient::Attach(boost::asio::io_service &io_service) {
asio_client_.reset(
new RedisAsioClient(io_service, context_->async_context()));
return Status::OK();
}

ObjectTable &AsyncGcsClient::object_table() {
return *object_table_;
}
Expand Down
7 changes: 7 additions & 0 deletions src/ray/gcs/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "plasma/events.h"
#include "ray/id.h"
#include "ray/status.h"
#include "ray/gcs/asio.h"
#include "ray/gcs/tables.h"
#include "ray/util/logging.h"

Expand All @@ -22,7 +23,12 @@ class RAY_EXPORT AsyncGcsClient {
~AsyncGcsClient();

Status Connect(const std::string &address, int port);
/// Attach this client to a plasma event loop. Note that only
/// one event loop should be attached at a time.
Status Attach(plasma::EventLoop &event_loop);
/// Attach this client to an asio event loop. Note that only
/// one event loop should be attached at a time.
Status Attach(boost::asio::io_service &io_service);

inline FunctionTable &function_table();
// TODO: Some API for getting the error on the driver
Expand Down Expand Up @@ -51,6 +57,7 @@ class RAY_EXPORT AsyncGcsClient {
std::unique_ptr<ObjectTable> object_table_;
std::unique_ptr<TaskTable> task_table_;
std::shared_ptr<RedisContext> context_;
std::unique_ptr<RedisAsioClient> asio_client_;
};

class SyncGcsClient {
Expand Down
84 changes: 64 additions & 20 deletions src/ray/gcs/client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,50 @@ extern "C" {

namespace ray {

aeEventLoop *loop;

class TestGcs : public ::testing::Test {
public:
TestGcs() {
RAY_CHECK_OK(client_.Connect("127.0.0.1", 6379));
job_id_ = UniqueID::from_random();
}

virtual ~TestGcs(){};

virtual void Start() = 0;

virtual void Stop() = 0;

protected:
gcs::AsyncGcsClient client_;
UniqueID job_id_;
};

TestGcs *test;

class TestGcsWithAe : public TestGcs {
public:
TestGcsWithAe() {
loop_ = aeCreateEventLoop(1024);
RAY_CHECK_OK(client_.context()->AttachToEventLoop(loop_));
}
~TestGcsWithAe() override { aeDeleteEventLoop(loop_); }
void Start() override { aeMain(loop_); }
void Stop() override { aeStop(loop_); }

private:
aeEventLoop *loop_;
};

class TestGcsWithAsio : public TestGcs {
public:
TestGcsWithAsio() { RAY_CHECK_OK(client_.Attach(io_service_)); }
void Start() override { io_service_.run(); }
void Stop() override { io_service_.stop(); }

private:
boost::asio::io_service io_service_;
};

void ObjectAdded(gcs::AsyncGcsClient *client,
const UniqueID &id,
std::shared_ptr<ObjectTableDataT> data) {
Expand All @@ -36,21 +66,28 @@ void Lookup(gcs::AsyncGcsClient *client,
const UniqueID &id,
std::shared_ptr<ObjectTableDataT> data) {
ASSERT_EQ(data->managers, std::vector<std::string>({"A", "B"}));
aeStop(loop);
test->Stop();
}

TEST_F(TestGcs, TestObjectTable) {
loop = aeCreateEventLoop(1024);
RAY_CHECK_OK(client_.context()->AttachToEventLoop(loop));
void TestObjectTable(const UniqueID &job_id, gcs::AsyncGcsClient &client) {
auto data = std::make_shared<ObjectTableDataT>();
data->managers.push_back("A");
data->managers.push_back("B");
ObjectID object_id = ObjectID::from_random();
RAY_CHECK_OK(
client_.object_table().Add(job_id_, object_id, data, &ObjectAdded));
RAY_CHECK_OK(client_.object_table().Lookup(job_id_, object_id, &Lookup));
aeMain(loop);
aeDeleteEventLoop(loop);
client.object_table().Add(job_id, object_id, data, &ObjectAdded));
RAY_CHECK_OK(client.object_table().Lookup(job_id, object_id, &Lookup));
test->Start();
}

TEST_F(TestGcsWithAe, TestObjectTable) {
test = this;
TestObjectTable(job_id_, client_);
}

TEST_F(TestGcsWithAsio, TestObjectTable) {
test = this;
TestObjectTable(job_id_, client_);
}

void TaskAdded(gcs::AsyncGcsClient *client,
Expand All @@ -69,7 +106,7 @@ void TaskLookupAfterUpdate(gcs::AsyncGcsClient *client,
const TaskID &id,
std::shared_ptr<TaskTableDataT> data) {
ASSERT_EQ(data->scheduling_state, SchedulingState_LOST);
aeStop(loop);
test->Stop();
}

void TaskUpdateCallback(gcs::AsyncGcsClient *client,
Expand All @@ -80,25 +117,32 @@ void TaskUpdateCallback(gcs::AsyncGcsClient *client,
&TaskLookupAfterUpdate));
}

TEST_F(TestGcs, TestTaskTable) {
loop = aeCreateEventLoop(1024);
RAY_CHECK_OK(client_.context()->AttachToEventLoop(loop));
void TestTaskTable(const UniqueID &job_id, gcs::AsyncGcsClient &client) {
auto data = std::make_shared<TaskTableDataT>();
data->scheduling_state = SchedulingState_SCHEDULED;
DBClientID local_scheduler_id =
DBClientID::from_binary("abcdefghijklmnopqrst");
data->scheduler_id = local_scheduler_id.binary();
TaskID task_id = TaskID::from_random();
RAY_CHECK_OK(client_.task_table().Add(job_id_, task_id, data, &TaskAdded));
RAY_CHECK_OK(client_.task_table().Lookup(job_id_, task_id, &TaskLookup));
RAY_CHECK_OK(client.task_table().Add(job_id, task_id, data, &TaskAdded));
RAY_CHECK_OK(client.task_table().Lookup(job_id, task_id, &TaskLookup));
auto update = std::make_shared<TaskTableTestAndUpdateT>();
update->test_scheduler_id = local_scheduler_id.binary();
update->test_state_bitmask = SchedulingState_SCHEDULED;
update->update_state = SchedulingState_LOST;
RAY_CHECK_OK(client_.task_table().TestAndUpdate(job_id_, task_id, update,
&TaskUpdateCallback));
aeMain(loop);
aeDeleteEventLoop(loop);
RAY_CHECK_OK(client.task_table().TestAndUpdate(job_id, task_id, update,
&TaskUpdateCallback));
test->Start();
}

TEST_F(TestGcsWithAe, TestTaskTable) {
test = this;
TestTaskTable(job_id_, client_);
}

TEST_F(TestGcsWithAsio, TestTaskTable) {
test = this;
TestTaskTable(job_id_, client_);
}

} // namespace
1 change: 1 addition & 0 deletions src/ray/gcs/redis_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class RedisContext {
uint8_t *data,
int64_t length,
int64_t callback_index);
redisAsyncContext *async_context() { return async_context_; }

private:
redisContext *context_;
Expand Down