Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change db_connect to allow different arguments from different processes. #142

Merged
merged 7 commits into from
Dec 21, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,28 @@ PYTHON_PLASMA_DIR="$PYTHON_DIR/plasma"
PYTHON_PHOTON_DIR="$PYTHON_DIR/photon"
PYTHON_GLOBAL_SCHEDULER_DIR="$PYTHON_DIR/global_scheduler"

# First clean up old build files.
pushd "$COMMON_DIR"
make clean
popd
pushd "$PLASMA_DIR"
make clean
popd
pushd "$PHOTON_DIR"
make clean
popd
pushd "$GLOBAL_SCHEDULER_DIR"
make clean
popd

# Now build everything.
pushd "$COMMON_DIR"
make
popd
cp "$COMMON_DIR/thirdparty/redis/src/redis-server" "$PYTHON_COMMON_DIR/thirdparty/redis/src/"
cp "$COMMON_DIR/redis_module/ray_redis_module.so" "$PYTHON_COMMON_DIR/redis_module/ray_redis_module.so"

pushd "$PLASMA_DIR"
make clean
make
pushd "$PLASMA_DIR/build"
cmake ..
Expand All @@ -48,7 +61,6 @@ cp "$PLASMA_DIR/plasma/plasma.py" "$PYTHON_PLASMA_DIR/"
cp "$PLASMA_DIR/plasma/libplasma.so" "$PYTHON_PLASMA_DIR/"

pushd "$PHOTON_DIR"
make clean
make
pushd "$PHOTON_DIR/build"
cmake ..
Expand All @@ -60,7 +72,6 @@ cp "$PHOTON_DIR/photon/libphoton.so" "$PYTHON_PHOTON_DIR/photon/"
cp "$PHOTON_DIR/photon/photon_services.py" "$PYTHON_PHOTON_DIR/photon/"

pushd "$GLOBAL_SCHEDULER_DIR"
make clean
make
popd
cp "$GLOBAL_SCHEDULER_DIR/build/global_scheduler" "$PYTHON_GLOBAL_SCHEDULER_DIR/build/"
Expand Down
8 changes: 5 additions & 3 deletions lib/python/ray/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,13 @@ def start_global_scheduler(redis_address, cleanup=True):
if cleanup:
all_processes.append(p)

def start_local_scheduler(redis_address, plasma_store_name, plasma_manager_name, plasma_address=None, cleanup=True):
def start_local_scheduler(redis_address, node_ip_address, plasma_store_name, plasma_manager_name, plasma_address=None, cleanup=True):
"""Start a local scheduler process.

Args:
redis_address (str): The address of the Redis instance.
node_ip_address (str): The IP address of the node that this local scheduler
is running on.
plasma_store_name (str): The name of the plasma store socket to connect to.
plasma_manager_name (str): The name of the plasma manager socket to connect
to.
Expand All @@ -133,7 +135,7 @@ def start_local_scheduler(redis_address, plasma_store_name, plasma_manager_name,
Return:
The name of the local scheduler socket.
"""
local_scheduler_name, p = photon.start_local_scheduler(plasma_store_name, plasma_manager_name, redis_address=redis_address, plasma_address=plasma_address, use_profiler=RUN_PHOTON_PROFILER)
local_scheduler_name, p = photon.start_local_scheduler(plasma_store_name, plasma_manager_name, node_ip_address=node_ip_address, redis_address=redis_address, plasma_address=plasma_address, use_profiler=RUN_PHOTON_PROFILER)
if cleanup:
all_processes.append(p)
return local_scheduler_name
Expand Down Expand Up @@ -247,7 +249,7 @@ def start_ray_local(node_ip_address="127.0.0.1", num_workers=0, num_local_schedu
time.sleep(0.1)
# Start the local scheduler.
plasma_address = "{}:{}".format(node_ip_address, object_store_manager_port)
local_scheduler_name = start_local_scheduler(redis_address, object_store_name, object_store_manager_name, plasma_address=plasma_address, cleanup=True)
local_scheduler_name = start_local_scheduler(redis_address, node_ip_address, object_store_name, object_store_manager_name, plasma_address=plasma_address, cleanup=True)
local_scheduler_names.append(local_scheduler_name)
time.sleep(0.1)
# Aggregate the address information together.
Expand Down
46 changes: 34 additions & 12 deletions src/common/redis_module/ray_redis_module.c
Original file line number Diff line number Diff line change
Expand Up @@ -74,22 +74,40 @@ RedisModuleKey *OpenPrefixedKey(RedisModuleCtx *ctx,
int Connect_RedisCommand(RedisModuleCtx *ctx,
RedisModuleString **argv,
int argc) {
if (argc != 5) {
if (argc < 4) {
return RedisModule_WrongArity(ctx);
}
if (argc % 2 != 0) {
return RedisModule_WrongArity(ctx);
}

RedisModuleString *client_type = argv[1];
RedisModuleString *address = argv[2];
RedisModuleString *ray_client_id = argv[3];
RedisModuleString *aux_address = argv[4];
RedisModuleString *ray_client_id = argv[1];
RedisModuleString *node_ip_address = argv[2];
RedisModuleString *client_type = argv[3];

/* Add this client to the Ray db client table. */
RedisModuleKey *db_client_table_key =
OpenPrefixedKey(ctx, DB_CLIENT_PREFIX, ray_client_id, REDISMODULE_WRITE);

/* This will be used to construct a publish message. */
RedisModuleString *aux_address = NULL;
RedisModuleString *aux_address_key =
RedisModule_CreateString(ctx, "aux_address", strlen("aux_address"));

RedisModule_HashSet(db_client_table_key, REDISMODULE_HASH_CFIELDS,
"client_type", client_type, "address", address,
"aux_address", aux_address, NULL);
"node_ip_address", node_ip_address, NULL);

for (int i = 4; i < argc; i += 2) {
RedisModuleString *key = argv[i];
RedisModuleString *value = argv[i + 1];
RedisModule_HashSet(db_client_table_key, REDISMODULE_HASH_NONE, key, value,
NULL);
if (RedisModule_StringCompare(key, aux_address_key) == 0) {
aux_address = value;
}
}
/* Clean up. */
RedisModule_FreeString(ctx, aux_address_key);
RedisModule_CloseKey(db_client_table_key);

/* Construct strings to publish on the db client channel. */
Expand All @@ -107,11 +125,15 @@ int Connect_RedisCommand(RedisModuleCtx *ctx,
/* Append a space. */
RedisModule_StringAppendBuffer(ctx, client_info, " ", strlen(" "));
/* Append the aux address. */
size_t aux_address_size;
const char *aux_address_str =
RedisModule_StringPtrLen(aux_address, &aux_address_size);
RedisModule_StringAppendBuffer(ctx, client_info, aux_address_str,
aux_address_size);
if (aux_address == NULL) {
RedisModule_StringAppendBuffer(ctx, client_info, ":", strlen(":"));
} else {
size_t aux_address_size;
const char *aux_address_str =
RedisModule_StringPtrLen(aux_address, &aux_address_size);
RedisModule_StringAppendBuffer(ctx, client_info, aux_address_str,
aux_address_size);
}
/* Publish the client info on the db client channel. */
RedisModuleCallReply *reply;
reply = RedisModule_Call(ctx, "PUBLISH", "ss", channel_name, client_info);
Expand Down
23 changes: 10 additions & 13 deletions src/common/state/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,22 @@ typedef struct db_handle db_handle;
* @param db_address The hostname to use to connect to the database.
* @param db_port The port to use to connect to the database.
* @param client_type The type of this client.
* @param client_addr The hostname of the client that is connecting. If not
* relevant, set this to the empty string.
* @param client_port The port of the client that is connecting. If not
* relevant, set this to -1.
* @param node_ip_address The hostname of the client that is connecting.
* @param num_args The number of extra arguments that should be supplied. This
* should be an even number.
* @param args An array of extra arguments strings. They should alternate
* between the name of the argument and the value of the argument. For
* examples: "port", "1234", "socket_name", "/tmp/s1".
* @return This returns a handle to the database, which must be freed with
* db_disconnect after use.
*/

db_handle *db_connect(const char *db_address,
int db_port,
const char *client_type,
const char *client_addr,
int client_port);
db_handle *db_connect_extended(const char *db_address,
int db_port,
const char *client_type,
const char *client_addr,
int client_port,
const char *aux_address);
const char *node_ip_address,
int num_args,
const char **args);

/**
* Attach global system store connection to an event loop. Callbacks from
* queries to the global system store will trigger events in the event loop.
Expand Down
80 changes: 50 additions & 30 deletions src/common/state/redis.c
Original file line number Diff line number Diff line change
Expand Up @@ -129,78 +129,98 @@ typedef struct {
object_id object_id;
} object_table_get_entry_info;

db_handle *db_connect(const char *address,
int port,
db_handle *db_connect(const char *db_address,
int db_port,
const char *client_type,
const char *client_addr,
int client_port) {
return db_connect_extended(address, port, client_type, client_addr,
client_port, ":");
}
const char *node_ip_address,
int num_args,
const char **args) {
/* Check that the number of args is even. These args will be passed to the
* RAY.CONNECT Redis command, which takes arguments in pairs. */
if (num_args % 2 != 0) {
LOG_FATAL("The number of extra args must be divisible by two.");
}

db_handle *db_connect_extended(const char *address,
int port,
const char *client_type,
const char *client_addr,
int client_port,
const char *aux_address) {
db_handle *db = malloc(sizeof(db_handle));
/* Sync connection for initial handshake */
redisReply *reply;
int connection_attempts = 0;
redisContext *context = redisConnect(address, port);
/* Sanity check aux_address. */
if (aux_address == NULL || strlen(aux_address) == 0) {
LOG_WARN("db_connect: received empty aux_address, replacing with ':'");
aux_address = ":";
}
redisContext *context = redisConnect(db_address, db_port);
while (context == NULL || context->err) {
if (connection_attempts >= REDIS_DB_CONNECT_RETRIES) {
break;
}
LOG_WARN("Failed to connect to Redis, retrying.");
/* Sleep for a little. */
usleep(REDIS_DB_CONNECT_WAIT_MS * 1000);
context = redisConnect(address, port);
context = redisConnect(db_address, db_port);
connection_attempts += 1;
}
CHECK_REDIS_CONNECT(redisContext, context,
"could not establish synchronous connection to redis "
"%s:%d",
address, port);
db_address, db_port);
/* Enable keyspace events. */
reply = redisCommand(context, "CONFIG SET notify-keyspace-events AKE");
CHECKM(reply != NULL, "db_connect failed on CONFIG SET");
freeReplyObject(reply);
/* Add new client using optimistic locking. */
db_client_id client = globally_unique_id();

/* Construct the argument arrays for RAY.CONNECT. */
int argc = num_args + 4;
const char **argv = malloc(sizeof(char *) * argc);
size_t *argvlen = malloc(sizeof(size_t) * argc);
/* Set the command name argument. */
argv[0] = "RAY.CONNECT";
argvlen[0] = strlen(argv[0]);
/* Set the client ID argument. */
argv[1] = (char *) client.id;
argvlen[1] = sizeof(db->client.id);
/* Set the node IP address argument. */
argv[2] = node_ip_address;
argvlen[2] = strlen(node_ip_address);
/* Set the client type argument. */
argv[3] = client_type;
argvlen[3] = strlen(client_type);
/* Set the remaining arguments. */
for (int i = 0; i < num_args; ++i) {
if (args[i] == NULL) {
LOG_FATAL("Element %d of the args array passed to db_connect was NULL.",
i);
}
argv[4 + i] = args[i];
argvlen[4 + i] = strlen(args[i]);
}

/* Register this client with Redis. RAY.CONNECT is a custom Redis command that
* we've defined. */
reply = redisCommand(context, "RAY.CONNECT %s %s:%d %b %s", client_type,
client_addr, client_port, (char *) client.id,
sizeof(client.id), aux_address);
reply = redisCommandArgv(context, argc, argv, argvlen);
CHECKM(reply != NULL, "db_connect failed on RAY.CONNECT");
CHECK(reply->type != REDIS_REPLY_ERROR);
CHECK(strcmp(reply->str, "OK") == 0);
freeReplyObject(reply);
free(argv);
free(argvlen);

db->client_type = strdup(client_type);
db->client = client;
db->db_client_cache = NULL;
db->sync_context = context;

/* Establish async connection */
db->context = redisAsyncConnect(address, port);
db->context = redisAsyncConnect(db_address, db_port);
CHECK_REDIS_CONNECT(redisAsyncContext, db->context,
"could not establish asynchronous connection to redis "
"%s:%d",
address, port);
db_address, db_port);
db->context->data = (void *) db;
/* Establish async connection for subscription */
db->sub_context = redisAsyncConnect(address, port);
db->sub_context = redisAsyncConnect(db_address, db_port);
CHECK_REDIS_CONNECT(redisAsyncContext, db->sub_context,
"could not establish asynchronous subscription "
"connection to redis %s:%d",
address, port);
db_address, db_port);
db->sub_context->data = (void *) db;

return db;
Expand Down Expand Up @@ -519,8 +539,8 @@ void redis_get_cached_db_client(db_handle *db,
redisReply *reply =
redisCommand(db->sync_context, "RAY.GET_CLIENT_ADDRESS %b",
(char *) db_client_id.id, sizeof(db_client_id.id));
CHECKM(reply->type == REDIS_REPLY_STRING, "REDIS reply type=%d",
reply->type);
CHECKM(reply->type == REDIS_REPLY_STRING, "REDIS reply type=%d, str=%s",
reply->type, reply->str);
entry = malloc(sizeof(db_client_cache_entry));
entry->db_client_id = db_client_id;
entry->addr = strdup(reply->str);
Expand Down
19 changes: 13 additions & 6 deletions src/common/test/db_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
#include "state/redis.h"
#include "task.h"

#include "utstring.h"

SUITE(db_tests);

/* Retry 10 times with an 100ms timeout. */
Expand Down Expand Up @@ -65,10 +67,14 @@ int64_t timeout_handler(event_loop *loop, int64_t id, void *context) {

TEST object_table_lookup_test(void) {
event_loop *loop = event_loop_create();
/* This uses manager_port1. */
const char *db_connect_args1[] = {"address", "127.0.0.1:12345"};
db_handle *db1 = db_connect("127.0.0.1", 6379, "plasma_manager", manager_addr,
manager_port1);
2, db_connect_args1);
/* This uses manager_port2. */
const char *db_connect_args2[] = {"address", "127.0.0.1:12346"};
db_handle *db2 = db_connect("127.0.0.1", 6379, "plasma_manager", manager_addr,
manager_port2);
2, db_connect_args2);
db_attach(db1, loop, false);
db_attach(db2, loop, false);
unique_id id = globally_unique_id();
Expand Down Expand Up @@ -138,7 +144,8 @@ void task_table_test_callback(task *callback_task, void *user_data) {
TEST task_table_test(void) {
task_table_test_callback_called = 0;
event_loop *loop = event_loop_create();
db_handle *db = db_connect("127.0.0.1", 6379, "local_scheduler", "", -1);
db_handle *db =
db_connect("127.0.0.1", 6379, "local_scheduler", "127.0.0.1", 0, NULL);
db_attach(db, loop, false);
node_id node = globally_unique_id();
task_spec *spec = example_task_spec(1, 1);
Expand Down Expand Up @@ -170,7 +177,8 @@ void task_table_all_test_callback(task *task, void *user_data) {

TEST task_table_all_test(void) {
event_loop *loop = event_loop_create();
db_handle *db = db_connect("127.0.0.1", 6379, "local_scheduler", "", -1);
db_handle *db =
db_connect("127.0.0.1", 6379, "local_scheduler", "127.0.0.1", 0, NULL);
db_attach(db, loop, false);
task_spec *spec = example_task_spec(1, 1);
/* Schedule two tasks on different nodes. */
Expand Down Expand Up @@ -204,8 +212,7 @@ TEST unique_client_id_test(void) {
db_client_id ids[num_conns];
db_handle *db;
for (int i = 0; i < num_conns; ++i) {
db = db_connect("127.0.0.1", 6379, "plasma_manager", manager_addr,
manager_port1);
db = db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 0, NULL);
ids[i] = get_db_client_id(db);
db_disconnect(db);
}
Expand Down
Loading