Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 16 additions & 14 deletions src/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ include(${CMAKE_CURRENT_LIST_DIR}/cmake/Common.cmake)

add_subdirectory(redis_module)

set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fPIC")

include_directories(thirdparty/ae)

add_custom_target(
Expand All @@ -14,26 +16,26 @@ add_custom_target(
WORKING_DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/thirdparty/hiredis)

add_library(common STATIC
event_loop.c
common.c
task.c
io.c
net.c
logging.c
state/redis.c
state/table.c
state/object_table.c
state/task_table.c
state/db_client_table.c
state/actor_notification_table.c
state/local_scheduler_table.c
event_loop.cc
common.cc
task.cc
io.cc
net.cc
logging.cc
state/redis.cc
state/table.cc
state/object_table.cc
state/task_table.cc
state/db_client_table.cc
state/actor_notification_table.cc
state/local_scheduler_table.cc
thirdparty/ae/ae.c
thirdparty/sha256.c)

target_link_libraries(common "${CMAKE_CURRENT_LIST_DIR}/thirdparty/hiredis/libhiredis.a")

function(define_test test_name library)
add_executable(${test_name} test/${test_name}.c ${ARGN})
add_executable(${test_name} test/${test_name}.cc ${ARGN})
add_dependencies(${test_name} hiredis flatbuffers_ep)
target_link_libraries(${test_name} common ${FLATBUFFERS_STATIC_LIB} ${library})
target_compile_options(${test_name} PUBLIC "-DPLASMA_TEST -DLOCAL_SCHEDULER_TEST -DCOMMON_TEST -DRAY_COMMON_LOG_LEVEL=4 -DRAY_TIMEOUT=50")
Expand Down
File renamed without changes.
File renamed without changes.
2 changes: 2 additions & 0 deletions src/common/event_loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@

#include <stdint.h>

extern "C" {
#ifdef _WIN32
/* Quirks mean that Windows version needs to be included differently */
#include <hiredis/hiredis.h>
#include <ae.h>
#else
#include "ae/ae.h"
#endif
}

/* Unique timer ID that will be generated when the timer is added to the
* event loop. Will not be reused later on in another call
Expand Down
2 changes: 1 addition & 1 deletion src/common/io.c → src/common/io.cc
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ int64_t read_buffer(int fd, int64_t *type, UT_array *buffer) {
return 0;
}

void write_log_message(int fd, char *message) {
void write_log_message(int fd, const char *message) {
/* Account for the \0 at the end of the string. */
write_message(fd, LOG_MESSAGE, strlen(message) + 1, (uint8_t *) message);
}
Expand Down
2 changes: 1 addition & 1 deletion src/common/io.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ int64_t read_buffer(int fd, int64_t *type, UT_array *buffer);
/**
* Write a null-terminated string to a file descriptor.
*/
void write_log_message(int fd, char *message);
void write_log_message(int fd, const char *message);

void write_formatted_log_message(int fd, const char *format, ...);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ PyObject *PyTask_from_string(PyObject *self, PyObject *args) {
}
PyTask *result = PyObject_New(PyTask, &PyTaskType);
result = (PyTask *) PyObject_Init((PyObject *) result, &PyTaskType);
result->spec = malloc(size);
result->spec = (task_spec *) malloc(size);
memcpy(result->spec, data, size);
/* TODO(pcm): Better error checking once we use flatbuffers. */
if (size != task_spec_size(result->spec)) {
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ void actor_notification_table_subscribe(
void *subscribe_context,
RetryInfo *retry) {
ActorNotificationTableSubscribeData *sub_data =
malloc(sizeof(ActorNotificationTableSubscribeData));
(ActorNotificationTableSubscribeData *) malloc(
sizeof(ActorNotificationTableSubscribeData));
sub_data->subscribe_callback = subscribe_callback;
sub_data->subscribe_context = subscribe_context;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ void db_client_table_subscribe(
db_client_table_done_callback done_callback,
void *user_context) {
DBClientTableSubscribeData *sub_data =
malloc(sizeof(DBClientTableSubscribeData));
(DBClientTableSubscribeData *) malloc(sizeof(DBClientTableSubscribeData));
sub_data->subscribe_callback = subscribe_callback;
sub_data->subscribe_context = subscribe_context;

init_table_callback(db_handle, NIL_ID, __func__, sub_data, retry,
done_callback, redis_db_client_table_subscribe,
user_context);
(table_done_callback) done_callback,
redis_db_client_table_subscribe, user_context);
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ void local_scheduler_table_subscribe(
void *subscribe_context,
RetryInfo *retry) {
LocalSchedulerTableSubscribeData *sub_data =
malloc(sizeof(LocalSchedulerTableSubscribeData));
(LocalSchedulerTableSubscribeData *) malloc(
sizeof(LocalSchedulerTableSubscribeData));
sub_data->subscribe_callback = subscribe_callback;
sub_data->subscribe_context = subscribe_context;

Expand All @@ -19,7 +20,8 @@ void local_scheduler_table_send_info(DBHandle *db_handle,
LocalSchedulerInfo *info,
RetryInfo *retry) {
LocalSchedulerTableSendInfoData *data =
malloc(sizeof(LocalSchedulerTableSendInfoData));
(LocalSchedulerTableSendInfoData *) malloc(
sizeof(LocalSchedulerTableSendInfoData));
data->info = *info;

init_table_callback(db_handle, NIL_ID, __func__, data, retry, NULL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ void object_table_lookup(DBHandle *db_handle,
void *user_context) {
CHECK(db_handle != NULL);
init_table_callback(db_handle, object_id, __func__, NULL, retry,
done_callback, redis_object_table_lookup, user_context);
(table_done_callback) done_callback,
redis_object_table_lookup, user_context);
}

void object_table_add(DBHandle *db_handle,
Expand All @@ -21,11 +22,13 @@ void object_table_add(DBHandle *db_handle,
void *user_context) {
CHECK(db_handle != NULL);

ObjectTableAddData *info = malloc(sizeof(ObjectTableAddData));
ObjectTableAddData *info =
(ObjectTableAddData *) malloc(sizeof(ObjectTableAddData));
info->object_size = object_size;
memcpy(&info->digest[0], digest, DIGEST_SIZE);
init_table_callback(db_handle, object_id, __func__, info, retry,
done_callback, redis_object_table_add, user_context);
(table_done_callback) done_callback,
redis_object_table_add, user_context);
}

void object_table_remove(DBHandle *db_handle,
Expand All @@ -38,11 +41,12 @@ void object_table_remove(DBHandle *db_handle,
/* Copy the client ID, if one was provided. */
DBClientID *client_id_copy = NULL;
if (client_id != NULL) {
client_id_copy = malloc(sizeof(DBClientID));
client_id_copy = (DBClientID *) malloc(sizeof(DBClientID));
*client_id_copy = *client_id;
}
init_table_callback(db_handle, object_id, __func__, client_id_copy, retry,
done_callback, redis_object_table_remove, user_context);
(table_done_callback) done_callback,
redis_object_table_remove, user_context);
}

void object_table_subscribe_to_notifications(
Expand All @@ -54,14 +58,16 @@ void object_table_subscribe_to_notifications(
object_table_lookup_done_callback done_callback,
void *user_context) {
CHECK(db_handle != NULL);
ObjectTableSubscribeData *sub_data = malloc(sizeof(ObjectTableSubscribeData));
ObjectTableSubscribeData *sub_data =
(ObjectTableSubscribeData *) malloc(sizeof(ObjectTableSubscribeData));
sub_data->object_available_callback = object_available_callback;
sub_data->subscribe_context = subscribe_context;
sub_data->subscribe_all = subscribe_all;

init_table_callback(
db_handle, NIL_OBJECT_ID, __func__, sub_data, retry, done_callback,
redis_object_table_subscribe_to_notifications, user_context);
init_table_callback(db_handle, NIL_OBJECT_ID, __func__, sub_data, retry,
(table_done_callback) done_callback,
redis_object_table_subscribe_to_notifications,
user_context);
}

void object_table_request_notifications(DBHandle *db_handle,
Expand All @@ -71,8 +77,9 @@ void object_table_request_notifications(DBHandle *db_handle,
CHECK(db_handle != NULL);
CHECK(num_object_ids > 0);
ObjectTableRequestNotificationsData *data =
malloc(sizeof(ObjectTableRequestNotificationsData) +
num_object_ids * sizeof(ObjectID));
(ObjectTableRequestNotificationsData *) malloc(
sizeof(ObjectTableRequestNotificationsData) +
num_object_ids * sizeof(ObjectID));
data->num_object_ids = num_object_ids;
memcpy(data->object_ids, object_ids, num_object_ids * sizeof(ObjectID));

Expand All @@ -86,12 +93,14 @@ void object_info_subscribe(DBHandle *db_handle,
RetryInfo *retry,
object_info_done_callback done_callback,
void *user_context) {
ObjectInfoSubscribeData *sub_data = malloc(sizeof(ObjectInfoSubscribeData));
ObjectInfoSubscribeData *sub_data =
(ObjectInfoSubscribeData *) malloc(sizeof(ObjectInfoSubscribeData));
sub_data->subscribe_callback = subscribe_callback;
sub_data->subscribe_context = subscribe_context;

init_table_callback(db_handle, NIL_OBJECT_ID, __func__, sub_data, retry,
done_callback, redis_object_info_subscribe, user_context);
(table_done_callback) done_callback,
redis_object_info_subscribe, user_context);
}

void result_table_add(DBHandle *db_handle,
Expand All @@ -100,10 +109,11 @@ void result_table_add(DBHandle *db_handle,
RetryInfo *retry,
result_table_done_callback done_callback,
void *user_context) {
TaskID *task_id_copy = malloc(sizeof(TaskID));
TaskID *task_id_copy = (TaskID *) malloc(sizeof(TaskID));
memcpy(task_id_copy, task_id_arg.id, sizeof(*task_id_copy));
init_table_callback(db_handle, object_id, __func__, task_id_copy, retry,
done_callback, redis_result_table_add, user_context);
(table_done_callback) done_callback,
redis_result_table_add, user_context);
}

void result_table_lookup(DBHandle *db_handle,
Expand All @@ -112,5 +122,6 @@ void result_table_lookup(DBHandle *db_handle,
result_table_lookup_callback done_callback,
void *user_context) {
init_table_callback(db_handle, object_id, __func__, NULL, retry,
done_callback, redis_result_table_lookup, user_context);
(table_done_callback) done_callback,
redis_result_table_lookup, user_context);
}
4 changes: 4 additions & 0 deletions src/common/state/redis.c → src/common/state/redis.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@
#include <stdbool.h>
#include <stdlib.h>
#include <unistd.h>

extern "C" {
/* Including hiredis here is necessary on Windows for typedefs used in ae.h. */
#include "hiredis/hiredis.h"
#include "hiredis/adapters/ae.h"
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need the extern C here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we do not convert hiredis to C++


#include "utstring.h"

#include "common.h"
Expand Down
3 changes: 2 additions & 1 deletion src/common/state/table.c → src/common/state/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ TableCallbackData *init_table_callback(DBHandle *db_handle,
}
CHECK(retry);
/* Allocate and initialize callback data structure for object table */
TableCallbackData *callback_data = malloc(sizeof(TableCallbackData));
TableCallbackData *callback_data =
(TableCallbackData *) malloc(sizeof(TableCallbackData));
CHECKM(callback_data != NULL, "Memory allocation error!")
callback_data->id = id;
callback_data->label = label;
Expand Down
25 changes: 15 additions & 10 deletions src/common/state/task_table.c → src/common/state/task_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
void task_table_get_task(DBHandle *db_handle,
TaskID task_id,
RetryInfo *retry,
task_table_get_callback done_callback,
task_table_get_callback get_callback,
void *user_context) {
init_table_callback(db_handle, task_id, __func__, NULL, retry, done_callback,
redis_task_table_get_task, user_context);
init_table_callback(db_handle, task_id, __func__, NULL, retry,
(void *) get_callback, redis_task_table_get_task,
user_context);
}

void task_table_add_task(DBHandle *db_handle,
Expand All @@ -18,7 +19,8 @@ void task_table_add_task(DBHandle *db_handle,
task_table_done_callback done_callback,
void *user_context) {
init_table_callback(db_handle, Task_task_id(task), __func__, task, retry,
done_callback, redis_task_table_add_task, user_context);
(table_done_callback) done_callback,
redis_task_table_add_task, user_context);
}

void task_table_update(DBHandle *db_handle,
Expand All @@ -27,7 +29,8 @@ void task_table_update(DBHandle *db_handle,
task_table_done_callback done_callback,
void *user_context) {
init_table_callback(db_handle, Task_task_id(task), __func__, task, retry,
done_callback, redis_task_table_update, user_context);
(table_done_callback) done_callback,
redis_task_table_update, user_context);
}

void task_table_test_and_update(DBHandle *db_handle,
Expand All @@ -38,14 +41,14 @@ void task_table_test_and_update(DBHandle *db_handle,
task_table_get_callback done_callback,
void *user_context) {
TaskTableTestAndUpdateData *update_data =
malloc(sizeof(TaskTableTestAndUpdateData));
(TaskTableTestAndUpdateData *) malloc(sizeof(TaskTableTestAndUpdateData));
update_data->test_state_bitmask = test_state_bitmask;
update_data->update_state = update_state;
/* Update the task entry's local scheduler with this client's ID. */
update_data->local_scheduler_id = db_handle->client;
init_table_callback(db_handle, task_id, __func__, update_data, retry,
done_callback, redis_task_table_test_and_update,
user_context);
(table_done_callback) done_callback,
redis_task_table_test_and_update, user_context);
}

/* TODO(swang): A corresponding task_table_unsubscribe. */
Expand All @@ -57,12 +60,14 @@ void task_table_subscribe(DBHandle *db_handle,
RetryInfo *retry,
task_table_done_callback done_callback,
void *user_context) {
TaskTableSubscribeData *sub_data = malloc(sizeof(TaskTableSubscribeData));
TaskTableSubscribeData *sub_data =
(TaskTableSubscribeData *) malloc(sizeof(TaskTableSubscribeData));
sub_data->local_scheduler_id = local_scheduler_id;
sub_data->state_filter = state_filter;
sub_data->subscribe_callback = subscribe_callback;
sub_data->subscribe_context = subscribe_context;

init_table_callback(db_handle, local_scheduler_id, __func__, sub_data, retry,
done_callback, redis_task_table_subscribe, user_context);
(table_done_callback) done_callback,
redis_task_table_subscribe, user_context);
}
2 changes: 1 addition & 1 deletion src/common/state/task_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ typedef void (*task_table_get_callback)(Task *task, void *user_context);
void task_table_get_task(DBHandle *db,
TaskID task_id,
RetryInfo *retry,
task_table_get_callback done_callback,
task_table_get_callback get_callback,
void *user_context);

/**
Expand Down
2 changes: 2 additions & 0 deletions src/common/task.c → src/common/task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
#include <stdio.h>
#include <string.h>

extern "C" {
#include "sha256.h"
}
#include "utarray.h"

#include "task.h"
Expand Down
File renamed without changes.
6 changes: 3 additions & 3 deletions src/common/test/db_tests.c → src/common/test/db_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,14 @@ Task *task_table_test_task;
void task_table_test_fail_callback(UniqueID id,
void *context,
void *user_data) {
event_loop *loop = user_data;
event_loop *loop = (event_loop *) user_data;
event_loop_stop(loop);
}

int64_t task_table_delayed_add_task(event_loop *loop,
int64_t id,
void *context) {
DBHandle *db = context;
DBHandle *db = (DBHandle *) context;
RetryInfo retry = {
.num_retries = NUM_RETRIES,
.timeout = TIMEOUT,
Expand All @@ -138,7 +138,7 @@ void task_table_test_callback(Task *callback_task, void *user_data) {
CHECK(Task_size(callback_task) == Task_size(task_table_test_task));
CHECK(memcmp(callback_task, task_table_test_task, Task_size(callback_task)) ==
0);
event_loop *loop = user_data;
event_loop *loop = (event_loop *) user_data;
event_loop_stop(loop);
}

Expand Down
Loading