Skip to content
This repository has been archived by the owner on Nov 29, 2023. It is now read-only.

Commit

Permalink
Deploy Redis module and start using custom Redis commands. (ray-proje…
Browse files Browse the repository at this point in the history
…ct#128)

* Add RAY.CONNECT Redis command.

* Add RAY.GET_CLIENT_ADDRESS command.

* Build and clean Redis in common Makefile.

* Use custom Redis module in Ray and use custom CONNECT and GET_CLIENT_ADDRESS commands.

* Fixes.

* Remove mapping from redis client ID to ray db client ID.

* Fix.
  • Loading branch information
robertnishihara committed Dec 16, 2016
1 parent 1c95840 commit 58a873e
Show file tree
Hide file tree
Showing 11 changed files with 168 additions and 57 deletions.
1 change: 1 addition & 0 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pushd "$COMMON_DIR"
make
popd
cp "$COMMON_DIR/thirdparty/redis/src/redis-server" "$PYTHON_COMMON_DIR/thirdparty/redis/src/"
cp "$COMMON_DIR/redis_module/ray_redis_module.so" "$PYTHON_COMMON_DIR/redis_module/ray_redis_module.so"

pushd "$PLASMA_DIR"
make clean
Expand Down
Empty file.
5 changes: 4 additions & 1 deletion lib/python/ray/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,15 @@ def start_redis(num_retries=20, cleanup=True):
Exception: An exception is raised if Redis could not be started.
"""
redis_filepath = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../common/thirdparty/redis/src/redis-server")
redis_module = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../common/redis_module/ray_redis_module.so")
assert os.path.isfile(redis_filepath)
assert os.path.isfile(redis_module)
counter = 0
while counter < num_retries:
if counter > 0:
print("Redis failed to start, retrying now.")
port = new_port()
p = subprocess.Popen([redis_filepath, "--port", str(port), "--loglevel", "warning"])
p = subprocess.Popen([redis_filepath, "--port", str(port), "--loglevel", "warning", "--loadmodule", redis_module])
time.sleep(0.1)
# Check if Redis successfully started (or at least if it the executable did
# not exit within 0.1 seconds).
Expand Down
3 changes: 2 additions & 1 deletion lib/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ def run(self):
setup(name="ray",
version="0.0.1",
packages=find_packages(),
package_data={"common": ["thirdparty/redis/src/redis-server"],
package_data={"common": ["thirdparty/redis/src/redis-server",
"redis_module/ray_redis_module.so"],
"plasma": ["plasma_store",
"plasma_manager",
"libplasma.so"],
Expand Down
14 changes: 9 additions & 5 deletions src/common/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ CC = gcc
CFLAGS = -g -Wall -Wextra -Werror=implicit-function-declaration -Wno-typedef-redefinition -Wno-sign-compare -Wno-unused-parameter -Wno-type-limits -Wno-missing-field-initializers --std=c99 -D_XOPEN_SOURCE=500 -D_POSIX_C_SOURCE=200809L -fPIC -I. -Ithirdparty -Ithirdparty/ae
BUILD = build

all: hiredis redis $(BUILD)/libcommon.a
all: hiredis redis redismodule $(BUILD)/libcommon.a

$(BUILD)/libcommon.a: event_loop.o common.o task.o io.o net.o state/redis.o state/table.o state/object_table.o state/task_table.o state/db_client_table.o thirdparty/ae/ae.o thirdparty/sha256.o
ar rcs $@ $^
Expand Down Expand Up @@ -31,16 +31,20 @@ $(BUILD)/redis_tests: hiredis test/redis_tests.c $(BUILD)/libcommon.a logging.h
clean:
rm -f *.o state/*.o test/*.o thirdparty/ae/*.o
rm -rf $(BUILD)/*
cd redis_module; make clean

redis:
cd thirdparty ; bash ./build-redis.sh

hiredis:
cd thirdparty/hiredis ; make

redismodule:
cd redis_module && make && cd ..

test: CFLAGS += -DRAY_COMMON_LOG_LEVEL=4
test: hiredis redis $(BUILD)/common_tests $(BUILD)/task_table_tests $(BUILD)/object_table_tests $(BUILD)/db_tests $(BUILD)/io_tests $(BUILD)/task_tests $(BUILD)/redis_tests FORCE
./thirdparty/redis/src/redis-server &
test: hiredis redis redismodule $(BUILD)/common_tests $(BUILD)/task_table_tests $(BUILD)/object_table_tests $(BUILD)/db_tests $(BUILD)/io_tests $(BUILD)/task_tests $(BUILD)/redis_tests FORCE
./thirdparty/redis/src/redis-server --loadmodule ./redis_module/ray_redis_module.so &
sleep 1s
./build/common_tests
./build/db_tests
Expand All @@ -50,10 +54,10 @@ test: hiredis redis $(BUILD)/common_tests $(BUILD)/task_table_tests $(BUILD)/obj
./build/task_table_tests
./build/object_table_tests
./thirdparty/redis/src/redis-cli shutdown
cd redis_module && make && sleep 1 && python runtest.py && cd ..
python ./redis_module/runtest.py

valgrind: test
./thirdparty/redis/src/redis-server &
./thirdparty/redis/src/redis-server --loadmodule redis_module/ray_redis_module.so &
sleep 1s
valgrind --leak-check=full --error-exitcode=1 ./build/common_tests
valgrind --leak-check=full --error-exitcode=1 ./build/db_tests
Expand Down
126 changes: 126 additions & 0 deletions src/common/redis_module/ray_redis_module.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* TODO(pcm): Fill this out.
*/

#define DB_CLIENT_PREFIX "CL:"
#define OBJECT_INFO_PREFIX "OI:"
#define OBJECT_LOCATION_PREFIX "OL:"
#define OBJECT_SUBSCRIBE_PREFIX "OS:"
Expand All @@ -41,6 +42,117 @@ RedisModuleKey *OpenPrefixedKey(RedisModuleCtx *ctx,
return key;
}

/**
* Register a client with Redis. This is called from a client with the command:
*
* RAY.CONNECT <client type> <address> <ray client id> <aux address>
*
* @param client_type The type of the client (e.g., plasma_manager).
* @param address The address of the client.
* @param ray_client_id The db client ID of the client.
* @param aux_address An auxiliary address. This is currently just used by the
* local scheduler to record the address of the plasma manager that it is
* connected to.
* @return OK if the operation was successful.
*/
int Connect_RedisCommand(RedisModuleCtx *ctx,
RedisModuleString **argv,
int argc) {
if (argc != 5) {
return RedisModule_WrongArity(ctx);
}

RedisModuleString *client_type = argv[1];
RedisModuleString *address = argv[2];
RedisModuleString *ray_client_id = argv[3];
RedisModuleString *aux_address = argv[4];

/* Add this client to the Ray db client table. */
RedisModuleKey *db_client_table_key =
OpenPrefixedKey(ctx, DB_CLIENT_PREFIX, ray_client_id, REDISMODULE_WRITE);
RedisModule_HashSet(db_client_table_key, REDISMODULE_HASH_CFIELDS,
"client_type", client_type, "address", address,
"aux_address", aux_address, NULL);
/* Clean up. */
RedisModule_CloseKey(db_client_table_key);

/* Construct strings to publish on the db client channel. */
RedisModuleString *channel_name =
RedisModule_CreateString(ctx, "db_clients", strlen("db_clients"));
RedisModuleString *client_info =
RedisModule_CreateStringFromString(ctx, ray_client_id);
RedisModule_StringAppendBuffer(ctx, client_info, ":", strlen(":"));
/* Append the client type. */
size_t client_type_size;
const char *client_type_str =
RedisModule_StringPtrLen(client_type, &client_type_size);
RedisModule_StringAppendBuffer(ctx, client_info, client_type_str,
client_type_size);
/* Append a space. */
RedisModule_StringAppendBuffer(ctx, client_info, " ", strlen(" "));
/* Append the aux address. */
size_t aux_address_size;
const char *aux_address_str =
RedisModule_StringPtrLen(aux_address, &aux_address_size);
RedisModule_StringAppendBuffer(ctx, client_info, aux_address_str,
aux_address_size);
/* Publish the client info on the db client channel. */
RedisModuleCallReply *reply;
reply = RedisModule_Call(ctx, "PUBLISH", "ss", channel_name, client_info);
RedisModule_FreeString(ctx, channel_name);
RedisModule_FreeString(ctx, client_info);
if (reply == NULL) {
return RedisModule_ReplyWithError(ctx, "PUBLISH unsuccessful");
}

RedisModule_ReplyWithSimpleString(ctx, "OK");
return REDISMODULE_OK;
}

/**
* Get the address of a client from its db client ID. This is called from a
* client with the command:
*
* RAY.GET_CLIENT_ADDRESS <ray client id>
*
* @param ray_client_id The db client ID of the client.
* @return The address of the client if the operation was successful.
*/
int GetClientAddress_RedisCommand(RedisModuleCtx *ctx,
RedisModuleString **argv,
int argc) {
if (argc != 2) {
return RedisModule_WrongArity(ctx);
}

RedisModuleString *ray_client_id = argv[1];
/* Get the request client address from the db client table. */
RedisModuleKey *db_client_table_key =
OpenPrefixedKey(ctx, DB_CLIENT_PREFIX, ray_client_id, REDISMODULE_READ);
if (db_client_table_key == NULL) {
/* There is no client with this ID. */
RedisModule_CloseKey(db_client_table_key);
return RedisModule_ReplyWithError(ctx, "invalid client ID");
}
RedisModuleString *address;
RedisModule_HashGet(db_client_table_key, REDISMODULE_HASH_CFIELDS, "address",
&address, NULL);
if (address == NULL) {
/* The key did not exist. This should not happen. */
RedisModule_CloseKey(db_client_table_key);
return RedisModule_ReplyWithError(
ctx, "Client does not have an address field. This shouldn't happen.");
}

RedisModule_ReplyWithString(ctx, address);

/* Cleanup. */
RedisModule_CloseKey(db_client_table_key);
RedisModule_FreeString(ctx, address);

return REDISMODULE_OK;
}

/**
* Lookup an entry in the object table.
*
Expand Down Expand Up @@ -81,6 +193,9 @@ int ObjectTableLookup_RedisCommand(RedisModuleCtx *ctx,
} while (RedisModule_ZsetRangeNext(key));
RedisModule_ReplySetArrayLength(ctx, num_results);

/* Clean up. */
RedisModule_CloseKey(key);

return REDISMODULE_OK;
}

Expand Down Expand Up @@ -285,6 +400,17 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx,
return REDISMODULE_ERR;
}

if (RedisModule_CreateCommand(ctx, "ray.connect", Connect_RedisCommand,
"write", 0, 0, 0) == REDISMODULE_ERR) {
return REDISMODULE_ERR;
}

if (RedisModule_CreateCommand(ctx, "ray.get_client_address",
GetClientAddress_RedisCommand, "write", 0, 0,
0) == REDISMODULE_ERR) {
return REDISMODULE_ERR;
}

if (RedisModule_CreateCommand(ctx, "ray.object_table_lookup",
ObjectTableLookup_RedisCommand, "readonly", 0,
0, 0) == REDISMODULE_ERR) {
Expand Down
45 changes: 8 additions & 37 deletions src/common/state/redis.c
Original file line number Diff line number Diff line change
Expand Up @@ -174,43 +174,14 @@ db_handle *db_connect_extended(const char *address,
freeReplyObject(reply);
/* Add new client using optimistic locking. */
db_client_id client = globally_unique_id();
while (true) {
reply = redisCommand(context, "WATCH %s", client_type);
freeReplyObject(reply);
reply = redisCommand(context, "HLEN %s", client_type);
freeReplyObject(reply);
reply = redisCommand(context, "MULTI");
freeReplyObject(reply);
reply = redisCommand(context,
"HMSET db_clients:%b client_type %s address %s:%d "
"db_client_id %b aux_address %s",
(char *) client.id, sizeof(client.id), client_type,
client_addr, client_port, (char *) client.id,
sizeof(client.id), aux_address);
CHECKM(reply != NULL, "db_connect failed on HMSET");
freeReplyObject(reply);

{
UT_string *tmpbuf;
utstring_new(tmpbuf);
utstring_printf(tmpbuf, "%s %s", client_type, aux_address);
reply =
redisCommand(context, "PUBLISH db_clients %b:%s", (char *) client.id,
sizeof(client.id), utstring_body(tmpbuf));
CHECKM(reply != NULL, "db_connect failed on PUBLISH");
freeReplyObject(reply);
utstring_free(tmpbuf);
}

reply = redisCommand(context, "EXEC");
CHECKM(reply != NULL, "db_connect failed on EXEC");
CHECK(reply);
if (reply->type != REDIS_REPLY_NIL) {
freeReplyObject(reply);
break;
}
freeReplyObject(reply);
}
/* Register this client with Redis. RAY.CONNECT is a custom Redis command that
* we've defined. */
reply = redisCommand(context, "RAY.CONNECT %s %s:%d %b %s", client_type,
client_addr, client_port, (char *) client.id,
sizeof(client.id), aux_address);
CHECKM(reply != NULL, "db_connect failed on RAY.CONNECT");
freeReplyObject(reply);

db->client_type = strdup(client_type);
db->client = client;
Expand Down Expand Up @@ -576,7 +547,7 @@ void redis_get_cached_db_client(db_handle *db,
if (!entry) {
/* This is a very rare case. It should happen at most once per db client. */
redisReply *reply =
redisCommand(db->sync_context, "HGET db_clients:%b address",
redisCommand(db->sync_context, "RAY.GET_CLIENT_ADDRESS %b",
(char *) db_client_id.id, sizeof(db_client_id.id));
CHECKM(reply->type == REDIS_REPLY_STRING, "REDIS reply type=%d",
reply->type);
Expand Down
14 changes: 10 additions & 4 deletions src/global_scheduler/test/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,13 @@ class TestGlobalScheduler(unittest.TestCase):
def setUp(self):
# Start a Redis server.
redis_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../../common/thirdparty/redis/src/redis-server")
redis_module = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../../common/redis_module/ray_redis_module.so")
assert os.path.isfile(redis_path)
assert os.path.isfile(redis_module)
node_ip_address = "127.0.0.1"
redis_port = new_port()
redis_address = "{}:{}".format(node_ip_address, redis_port)
self.redis_process = subprocess.Popen([redis_path, "--port", str(redis_port), "--loglevel", "warning"])
self.redis_process = subprocess.Popen([redis_path, "--port", str(redis_port), "--loglevel", "warning", "--loadmodule", redis_module])
time.sleep(0.1)
# Create a Redis client.
self.redis_client = redis.StrictRedis(host=node_ip_address, port=redis_port)
Expand Down Expand Up @@ -86,9 +89,12 @@ def tearDown(self):
self.redis_process.kill()

def test_redis_contents(self):
# There should be two db clients, the global scheduler, the local scheduler,
# and the plasma manager.
self.assertEqual(len(self.redis_client.keys("db_clients*")), 3)
# DB_CLIENT_PREFIX is an implementation detail of ray_redis_module.c, so
# this must be kept in sync with that file.
DB_CLIENT_PREFIX = "CL:"
# There should be three db clients, the global scheduler, the local
# scheduler, and the plasma manager.
self.assertEqual(len(self.redis_client.keys("{}*".format(DB_CLIENT_PREFIX))), 3)
# There should not be anything else in Redis yet.
self.assertEqual(len(self.redis_client.keys("*")), 3)

Expand Down
4 changes: 2 additions & 2 deletions src/photon/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ clean:
# Set the request timeout low and logging level at FATAL for testing purposes.
test: CFLAGS += -DRAY_TIMEOUT=50 -DRAY_COMMON_LOG_LEVEL=4
test: $(BUILD)/photon_tests FORCE
../common/thirdparty/redis/src/redis-server &
../common/thirdparty/redis/src/redis-server --loadmodule ../common/redis_module/ray_redis_module.so &
sleep 0.5s && ./build/photon_tests && ../common/thirdparty/redis/src/redis-cli shutdown

valgrind: test
../common/thirdparty/redis/src/redis-server &
../common/thirdparty/redis/src/redis-server --loadmodule ../common/redis_module/ray_redis_module.so &
sleep 0.5s && valgrind --leak-check=full --show-leak-kinds=all --error-exitcode=1 ./build/photon_tests && ../common/thirdparty/redis/src/redis-cli shutdown

FORCE:
2 changes: 1 addition & 1 deletion src/plasma/test/run_client_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# Cause the script to exit if a single command fails.
set -e

../common/thirdparty/redis/src/redis-server --loglevel warning &
../common/thirdparty/redis/src/redis-server --loglevel warning --loadmodule ../common/redis_module/ray_redis_module.so &
sleep 1
# flush the redis server
../common/thirdparty/redis/src/redis-cli flushall &
Expand Down
11 changes: 5 additions & 6 deletions src/plasma/test/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,6 @@ def assert_get_object_equal(unit_test, client1, client2, object_id, memory_buffe
unit_test.assertEqual(client1.get_metadata(object_id)[:],
client2.get_metadata(object_id)[:])

# Check if the redis-server binary is present.
redis_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../../common/thirdparty/redis/src/redis-server")
if not os.path.exists(redis_path):
raise Exception("You do not have the redis-server binary. Run `make test` in the plasma directory to get it.")

class TestPlasmaClient(unittest.TestCase):

def setUp(self):
Expand Down Expand Up @@ -364,10 +359,14 @@ def setUp(self):
store_name2, self.p3 = plasma.start_plasma_store(use_valgrind=USE_VALGRIND)
# Start a Redis server.
redis_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../../common/thirdparty/redis/src/redis-server")
redis_module = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../../common/redis_module/ray_redis_module.so")
assert os.path.isfile(redis_path)
assert os.path.isfile(redis_module)
redis_port = 6379
with open(os.devnull, "w") as FNULL:
self.redis_process = subprocess.Popen([redis_path,
"--port", str(redis_port)],
"--port", str(redis_port),
"--loadmodule", redis_module],
stdout=FNULL)
time.sleep(0.1)
# Start two PlasmaManagers.
Expand Down

0 comments on commit 58a873e

Please sign in to comment.