Skip to content
This repository has been archived by the owner on Nov 29, 2023. It is now read-only.

Commit

Permalink
Hiredis asio integration (ray-project#1547)
Browse files Browse the repository at this point in the history
  • Loading branch information
pcmoritz authored and robertnishihara committed Feb 20, 2018
1 parent 1b596f7 commit eabc402
Show file tree
Hide file tree
Showing 6 changed files with 243 additions and 0 deletions.
4 changes: 4 additions & 0 deletions cmake/Modules/ThirdpartyToolchain.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,7 @@ if(RAY_BUILD_TESTS OR RAY_BUILD_BENCHMARKS)

add_dependencies(gflags gflags_ep)
endif()

set(Boost_USE_STATIC_LIBS ON)
find_package(Boost COMPONENTS system filesystem REQUIRED)
include_directories(${Boost_INCLUDE_DIR})
1 change: 1 addition & 0 deletions src/ray/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ set(RAY_SRCS
gcs/client.cc
gcs/tables.cc
gcs/redis_context.cc
gcs/asio.cc
)

install(FILES
Expand Down
1 change: 1 addition & 0 deletions src/ray/gcs/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ add_custom_command(
add_custom_target(gen_gcs_fbs DEPENDS ${GCS_FBS_OUTPUT_FILES})

ADD_RAY_TEST(client_test STATIC_LINK_LIBS ray_static ${PLASMA_STATIC_LIB} ${ARROW_STATIC_LIB} gtest gtest_main pthread)
ADD_RAY_TEST(asio_test STATIC_LINK_LIBS ray_static ${PLASMA_STATIC_LIB} ${ARROW_STATIC_LIB} gtest gtest_main pthread ${Boost_SYSTEM_LIBRARY})

install(FILES
client.h
Expand Down
110 changes: 110 additions & 0 deletions src/ray/gcs/asio.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
#include "asio.h"

#include "ray/util/logging.h"

RedisAsioClient::RedisAsioClient(boost::asio::io_service &io_service,
redisAsyncContext *async_context)
: async_context_(async_context),
socket_(io_service),
read_requested_(false),
write_requested_(false),
read_in_progress_(false),
write_in_progress_(false) {
// gives access to c->fd
redisContext *c = &(async_context->c);

// hiredis is already connected
// use the existing native socket
socket_.assign(boost::asio::ip::tcp::v4(), c->fd);

// register hooks with the hiredis async context
async_context->ev.addRead = call_C_addRead;
async_context->ev.delRead = call_C_delRead;
async_context->ev.addWrite = call_C_addWrite;
async_context->ev.delWrite = call_C_delWrite;
async_context->ev.cleanup = call_C_cleanup;

// C wrapper functions will use this pointer to call class members.
async_context->ev.data = this;
}

void RedisAsioClient::operate() {
if (read_requested_ && !read_in_progress_) {
read_in_progress_ = true;
socket_.async_read_some(boost::asio::null_buffers(),
boost::bind(&RedisAsioClient::handle_read, this,
boost::asio::placeholders::error));
}

if (write_requested_ && !write_in_progress_) {
write_in_progress_ = true;
socket_.async_write_some(boost::asio::null_buffers(),
boost::bind(&RedisAsioClient::handle_write, this,
boost::asio::placeholders::error));
}
}

void RedisAsioClient::handle_read(boost::system::error_code error_code) {
RAY_CHECK(!error_code || error_code == boost::asio::error::would_block);
read_in_progress_ = false;
redisAsyncHandleRead(async_context_);

if (error_code == boost::asio::error::would_block) {
operate();
}
}

void RedisAsioClient::handle_write(boost::system::error_code error_code) {
RAY_CHECK(!error_code || error_code == boost::asio::error::would_block);
write_in_progress_ = false;
redisAsyncHandleWrite(async_context_);

if (error_code == boost::asio::error::would_block) {
operate();
}
}

void RedisAsioClient::add_read() {
read_requested_ = true;
operate();
}

void RedisAsioClient::del_read() {
read_requested_ = false;
}

void RedisAsioClient::add_write() {
write_requested_ = true;
operate();
}

void RedisAsioClient::del_write() {
write_requested_ = false;
}

void RedisAsioClient::cleanup() {}

static inline RedisAsioClient *cast_to_client(void *private_data) {
RAY_CHECK(private_data != nullptr);
return static_cast<RedisAsioClient *>(private_data);
}

extern "C" void call_C_addRead(void *private_data) {
cast_to_client(private_data)->add_read();
}

extern "C" void call_C_delRead(void *private_data) {
cast_to_client(private_data)->del_read();
}

extern "C" void call_C_addWrite(void *private_data) {
cast_to_client(private_data)->add_write();
}

extern "C" void call_C_delWrite(void *private_data) {
cast_to_client(private_data)->del_write();
}

extern "C" void call_C_cleanup(void *private_data) {
cast_to_client(private_data)->cleanup();
}
73 changes: 73 additions & 0 deletions src/ray/gcs/asio.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// Adapted from https://github.com/ryangraham/hiredis-boostasio-adapter
// (Copyright 2018 Ryan Graham)

#ifndef RAY_GCS_ASIO_H
#define RAY_GCS_ASIO_H

#include "hiredis/async.h"
#include "hiredis/hiredis.h"

#include <stdio.h>
#include <iostream>
#include <string>

#include <boost/asio.hpp>
#include <boost/asio/error.hpp>
#include <boost/bind.hpp>

using boost::asio::ip::tcp;

class RedisAsioClient {
public:
RedisAsioClient(boost::asio::io_service &io_service, redisAsyncContext *ac);

void operate();

void handle_read(boost::system::error_code ec);
void handle_write(boost::system::error_code ec);
void add_read();
void del_read();
void add_write();
void del_write();
void cleanup();

private:
redisAsyncContext *async_context_;
boost::asio::ip::tcp::socket socket_;
// Hiredis wanted to add a read operation to the event loop
// but the read might not have happened yet
bool read_requested_;
// Hiredis wanted to add a write operation to the event loop
// but the read might not have happened yet
bool write_requested_;
// A read is currently in progress
bool read_in_progress_;
// A write is currently in progress
bool write_in_progress_;
};

// C wrappers for class member functions
extern "C" void call_C_addRead(void *private_data);
extern "C" void call_C_delRead(void *private_data);
extern "C" void call_C_addWrite(void *private_data);
extern "C" void call_C_delWrite(void *private_data);
extern "C" void call_C_cleanup(void *private_data);

#endif // RAY_GCS_ASIO_H
54 changes: 54 additions & 0 deletions src/ray/gcs/asio_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#include <iostream>

#include "gtest/gtest.h"

#include "asio.h"

boost::asio::io_service io_service;

// For this test to work, you need to have a redis-server in your PATH

class TestRedisAsioClient : public ::testing::Test {
public:
TestRedisAsioClient() {
int r = system("redis-server > /dev/null & sleep 1");
std::cout << "TestRedisAsioClient: redis-server status code was " << r
<< std::endl;
}
~TestRedisAsioClient() {
int r = system("redis-cli -c shutdown");
std::cout << "TestRedisAsioClient: redis-cli status code was " << r
<< std::endl;
}
};

void ConnectCallback(const redisAsyncContext *c, int status) {
ASSERT_EQ(status, REDIS_OK);
}

void DisconnectCallback(const redisAsyncContext *c, int status) {
ASSERT_EQ(status, REDIS_OK);
}

void GetCallback(redisAsyncContext *c, void *r, void *privdata) {
redisReply *reply = reinterpret_cast<redisReply *>(r);
ASSERT_TRUE(reply != nullptr);
ASSERT_TRUE(std::string(reinterpret_cast<char *>(reply->str)) == "test");
redisAsyncDisconnect(c);
io_service.stop();
}

TEST_F(TestRedisAsioClient, TestRedisCommands) {
redisAsyncContext *ac = redisAsyncConnect("127.0.0.1", 6379);
ASSERT_TRUE(ac->err == 0);

RedisAsioClient client(io_service, ac);

redisAsyncSetConnectCallback(ac, ConnectCallback);
redisAsyncSetDisconnectCallback(ac, DisconnectCallback);

redisAsyncCommand(ac, NULL, NULL, "SET key test");
redisAsyncCommand(ac, GetCallback, nullptr, "GET key");

io_service.run();
}

0 comments on commit eabc402

Please sign in to comment.