Skip to content

Commit

Permalink
Merge pull request cpp-redis#3 from cpp-redis/appkins/streams
Browse files Browse the repository at this point in the history
Appkins/streams
  • Loading branch information
appkins authored Dec 10, 2018
2 parents bec42d9 + e66feb3 commit 543b4d7
Show file tree
Hide file tree
Showing 38 changed files with 3,111 additions and 2,252 deletions.
184 changes: 92 additions & 92 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,33 +44,33 @@ project(${PROJECT} CXX)
###
# compilation options
###
if(WIN32)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /W3 /O2 /bigobj")
if (WIN32)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /W3 /O2 /bigobj")

# was causing conflics with gtest build
string(REPLACE "/RTC1" "" CMAKE_CXX_FLAGS_DEBUG ${CMAKE_CXX_FLAGS_DEBUG})
# was causing conflics with gtest build
string(REPLACE "/RTC1" "" CMAKE_CXX_FLAGS_DEBUG ${CMAKE_CXX_FLAGS_DEBUG})

if("${MSVC_RUNTIME_LIBRARY_CONFIG}" STREQUAL "")
set(MSVC_RUNTIME_LIBRARY_CONFIG "/MT")
endif()
if ("${MSVC_RUNTIME_LIBRARY_CONFIG}" STREQUAL "")
set(MSVC_RUNTIME_LIBRARY_CONFIG "/MT")
endif ()

foreach(flag_var CMAKE_CXX_FLAGS CMAKE_CXX_FLAGS_DEBUG CMAKE_CXX_FLAGS_RELEASE)
if("${MSVC_RUNTIME_LIBRARY_CONFIG}" STREQUAL "/MT")
string(REPLACE "/MD" "/MT" ${flag_var} "${${flag_var}}")
elseif("${MSVC_RUNTIME_LIBRARY_CONFIG}" STREQUAL "/MD")
string(REPLACE "/MT" "/MD" ${flag_var} "${${flag_var}}")
else()
string(REPLACE "/MD" "${MSVC_RUNTIME_LIBRARY_CONFIG}" ${flag_var} "${${flag_var}}")
string(REPLACE "/MT" "${MSVC_RUNTIME_LIBRARY_CONFIG}" ${flag_var} "${${flag_var}}")
endif()
endforeach()
foreach (flag_var CMAKE_CXX_FLAGS CMAKE_CXX_FLAGS_DEBUG CMAKE_CXX_FLAGS_RELEASE)
if ("${MSVC_RUNTIME_LIBRARY_CONFIG}" STREQUAL "/MT")
string(REPLACE "/MD" "/MT" ${flag_var} "${${flag_var}}")
elseif ("${MSVC_RUNTIME_LIBRARY_CONFIG}" STREQUAL "/MD")
string(REPLACE "/MT" "/MD" ${flag_var} "${${flag_var}}")
else ()
string(REPLACE "/MD" "${MSVC_RUNTIME_LIBRARY_CONFIG}" ${flag_var} "${${flag_var}}")
string(REPLACE "/MT" "${MSVC_RUNTIME_LIBRARY_CONFIG}" ${flag_var} "${${flag_var}}")
endif ()
endforeach ()

add_definitions(-D_UNICODE)
add_definitions(-DUNICODE)
add_definitions(-DWIN32_LEAN_AND_MEAN)
else()
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -W -Wall -Wextra -O3")
endif(WIN32)
add_definitions(-D_UNICODE)
add_definitions(-DUNICODE)
add_definitions(-DWIN32_LEAN_AND_MEAN)
else ()
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -W -Wall -Wextra -O3")
endif (WIN32)


###
Expand All @@ -81,9 +81,9 @@ find_library(TACOPIE_LIBRARY tacopie)
set(CPP_REDIS_INCLUDES ${PROJECT_SOURCE_DIR}/includes)
set(DEPS_INCLUDES ${PROJECT_SOURCE_DIR}/deps/include)

if(NOT USE_CUSTOM_TCP_CLIENT)
set(DEPS_INCLUDES ${DEPS_INCLUDES} ${TACOPIE_INCLUDE_DIR})
endif()
if (NOT USE_CUSTOM_TCP_CLIENT)
set(DEPS_INCLUDES ${DEPS_INCLUDES} ${TACOPIE_INCLUDE_DIR})
endif ()

set(DEPS_LIBRARIES ${PROJECT_SOURCE_DIR}/deps/lib)

Expand All @@ -98,31 +98,31 @@ include_directories(${CPP_REDIS_INCLUDES} ${DEPS_INCLUDES})
# sources
###
set(SRC_DIRS "sources"
"sources/builders"
"sources/core"
"sources/misc"
"sources/network"
"includes/cpp_redis"
"includes/cpp_redis/builders"
"includes/cpp_redis/core"
"includes/cpp_redis/misc"
"includes/cpp_redis/network")

foreach(dir ${SRC_DIRS})
# get directory sources and headers
file(GLOB s_${dir} "${dir}/*.cpp")
file(GLOB h_${dir} "${dir}/*.hpp")
file(GLOB i_${dir} "${dir}/*.ipp")

# set sources
set(SOURCES ${SOURCES} ${s_${dir}} ${h_${dir}} ${i_${dir}})
endforeach()
"sources/builders"
"sources/core"
"sources/misc"
"sources/network"
"includes/cpp_redis"
"includes/cpp_redis/builders"
"includes/cpp_redis/core"
"includes/cpp_redis/misc"
"includes/cpp_redis/network")

foreach (dir ${SRC_DIRS})
# get directory sources and headers
file(GLOB s_${dir} "${dir}/*.cpp")
file(GLOB h_${dir} "${dir}/*.hpp")
file(GLOB i_${dir} "${dir}/*.ipp")

# set sources
set(SOURCES ${SOURCES} ${s_${dir}} ${h_${dir}} ${i_${dir}})
endforeach ()
# filter tcp_client if no tacopie
if(USE_CUSTOM_TCP_CLIENT)
file(GLOB tacopie_cpp "sources/network/tcp_client.cpp")
file(GLOB tacopie_h "includes/cpp_redis/network/tcp_client.hpp")
list(REMOVE_ITEM SOURCES ${tacopie_cpp} ${tacopie_h})
endif(USE_CUSTOM_TCP_CLIENT)
if (USE_CUSTOM_TCP_CLIENT)
file(GLOB tacopie_cpp "sources/network/tcp_client.cpp")
file(GLOB tacopie_h "includes/cpp_redis/network/tcp_client.hpp")
list(REMOVE_ITEM SOURCES ${tacopie_cpp} ${tacopie_h})
endif (USE_CUSTOM_TCP_CLIENT)


###
Expand All @@ -144,39 +144,39 @@ configure_file("cpp_redis.pc.in" "${CMAKE_PKGCONFIG_OUTPUT_DIRECTORY}/cpp_redis.
add_library(${PROJECT} ${SOURCES})
set_property(TARGET ${PROJECT} PROPERTY POSITION_INDEPENDENT_CODE ON)

if(WIN32)
set_target_properties(${PROJECT}
PROPERTIES COMPILE_PDB_NAME ${PROJECT}
COMPILE_PDB_OUTPUT_DIRECTORY ${CMAKE_LIBRARY_OUTPUT_DIRECTORY})
endif(WIN32)
if (WIN32)
set_target_properties(${PROJECT}
PROPERTIES COMPILE_PDB_NAME ${PROJECT}
COMPILE_PDB_OUTPUT_DIRECTORY ${CMAKE_LIBRARY_OUTPUT_DIRECTORY})
endif (WIN32)

if(WIN32)
target_link_libraries(${PROJECT} ws2_32)
else()
target_link_libraries(${PROJECT} pthread)
endif(WIN32)
if (WIN32)
target_link_libraries(${PROJECT} ws2_32)
else ()
target_link_libraries(${PROJECT} pthread)
endif (WIN32)

if(TACOPIE_LIBRARY)
target_link_libraries(${PROJECT} ${TACOPIE_LIBRARY})
else()
target_link_libraries(${PROJECT} tacopie)
endif(TACOPIE_LIBRARY)
if (TACOPIE_LIBRARY)
target_link_libraries(${PROJECT} ${TACOPIE_LIBRARY})
else ()
target_link_libraries(${PROJECT} tacopie)
endif (TACOPIE_LIBRARY)


# __CPP_REDIS_READ_SIZE
if(READ_SIZE)
set_property(TARGET ${PROJECT} APPEND_STRING PROPERTY COMPILE_DEFINITIONS " __CPP_REDIS_READ_SIZE=${READ_SIZE}")
endif(READ_SIZE)
if (READ_SIZE)
set_property(TARGET ${PROJECT} APPEND_STRING PROPERTY COMPILE_DEFINITIONS " __CPP_REDIS_READ_SIZE=${READ_SIZE}")
endif (READ_SIZE)

# __CPP_REDIS_LOGGING_ENABLED
if(LOGGING_ENABLED)
set_property(TARGET ${PROJECT} APPEND_STRING PROPERTY COMPILE_DEFINITIONS " __CPP_REDIS_LOGGING_ENABLED=${LOGGING_ENABLED}")
endif(LOGGING_ENABLED)
if (LOGGING_ENABLED)
set_property(TARGET ${PROJECT} APPEND_STRING PROPERTY COMPILE_DEFINITIONS " __CPP_REDIS_LOGGING_ENABLED=${LOGGING_ENABLED}")
endif (LOGGING_ENABLED)

# __CPP_REDIS_USE_CUSTOM_TCP_CLIENT
if(USE_CUSTOM_TCP_CLIENT)
set_property(TARGET ${PROJECT} APPEND_STRING PROPERTY COMPILE_DEFINITIONS " __CPP_REDIS_USE_CUSTOM_TCP_CLIENT=${USE_CUSTOM_TCP_CLIENT}")
endif(USE_CUSTOM_TCP_CLIENT)
if (USE_CUSTOM_TCP_CLIENT)
set_property(TARGET ${PROJECT} APPEND_STRING PROPERTY COMPILE_DEFINITIONS " __CPP_REDIS_USE_CUSTOM_TCP_CLIENT=${USE_CUSTOM_TCP_CLIENT}")
endif (USE_CUSTOM_TCP_CLIENT)


###
Expand All @@ -194,30 +194,30 @@ install(DIRECTORY ${CPP_REDIS_INCLUDES}/ DESTINATION include USE_SOURCE_PERMISSI
###
# examples
###
if(BUILD_EXAMPLES)
add_subdirectory(examples)
# Reset variable to false to ensure tacopie does no build examples
set(BUILD_EXAMPLES false)
endif(BUILD_EXAMPLES)
if (BUILD_EXAMPLES)
add_subdirectory(examples)
# Reset variable to false to ensure tacopie does no build examples
set(BUILD_EXAMPLES false)
endif (BUILD_EXAMPLES)

###
# tests
###
if(BUILD_TESTS)
enable_testing()
add_subdirectory(tests)
ExternalProject_Add("googletest"
GIT_REPOSITORY "https://github.com/google/googletest.git"
CMAKE_ARGS "-DCMAKE_INSTALL_PREFIX=${PROJECT_SOURCE_DIR}/deps")
# Reset variable to false to ensure tacopie does no build tests
set(BUILD_TESTS false)
endif(BUILD_TESTS)
if (BUILD_TESTS)
enable_testing()
add_subdirectory(tests)
ExternalProject_Add("googletest"
GIT_REPOSITORY "https://github.com/google/googletest.git"
CMAKE_ARGS "-DCMAKE_INSTALL_PREFIX=${PROJECT_SOURCE_DIR}/deps")
# Reset variable to false to ensure tacopie does no build tests
set(BUILD_TESTS false)
endif (BUILD_TESTS)


###
# tacopie
###
if(NOT TACOPIE_LIBRARY AND NOT USE_CUSTOM_TCP_CLIENT)
set(SOURCES) # reset the SOURCES var so that the tacopie project won't include the cpp_redis sources too
add_subdirectory(tacopie)
endif(NOT TACOPIE_LIBRARY AND NOT USE_CUSTOM_TCP_CLIENT)
if (NOT TACOPIE_LIBRARY AND NOT USE_CUSTOM_TCP_CLIENT)
set(SOURCES) # reset the SOURCES var so that the tacopie project won't include the cpp_redis sources too
add_subdirectory(tacopie)
endif (NOT TACOPIE_LIBRARY AND NOT USE_CUSTOM_TCP_CLIENT)
73 changes: 26 additions & 47 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,50 +39,29 @@ include_directories(${CPP_REDIS_INCLUDES})
###
link_directories(${DEPS_LIBRARIES})


###
# executable
###
add_executable(cpp_redis_client cpp_redis_client.cpp)
target_link_libraries(cpp_redis_client cpp_redis)

add_executable(cpp_redis_consumer cpp_redis_consumer.cpp)
target_link_libraries(cpp_redis_consumer cpp_redis)

add_executable(cpp_redis_future_client cpp_redis_future_client.cpp)
target_link_libraries(cpp_redis_future_client cpp_redis)

add_executable(cpp_redis_subscriber cpp_redis_subscriber.cpp)
target_link_libraries(cpp_redis_subscriber cpp_redis)

add_executable(cpp_redis_logger cpp_redis_logger.cpp)
target_link_libraries(cpp_redis_logger cpp_redis)

add_executable(cpp_redis_kill cpp_redis_kill.cpp)
target_link_libraries(cpp_redis_kill cpp_redis)

add_executable(cpp_redis_streams_client cpp_redis_streams_client.cpp)
target_link_libraries(cpp_redis_streams_client cpp_redis)

add_executable(cpp_redis_high_availability_client cpp_redis_high_availability_client.cpp)
target_link_libraries(cpp_redis_high_availability_client cpp_redis)


###
# link libs
###
if(WIN32)
target_link_libraries(cpp_redis_client ws2_32)
target_link_libraries(cpp_redis_future_client ws2_32)
target_link_libraries(cpp_redis_subscriber ws2_32)
target_link_libraries(cpp_redis_logger ws2_32)
target_link_libraries(cpp_redis_kill ws2_32)
target_link_libraries(cpp_redis_high_availability_client ws2_32)
else()
target_link_libraries(cpp_redis_client pthread)
target_link_libraries(cpp_redis_future_client pthread)
target_link_libraries(cpp_redis_subscriber pthread)
target_link_libraries(cpp_redis_logger pthread)
target_link_libraries(cpp_redis_kill pthread)
target_link_libraries(cpp_redis_high_availability_client pthread)
endif(WIN32)
set(EXAMPLES cpp_redis_client
cpp_redis_consumer
cpp_redis_future_client
cpp_redis_subscriber
cpp_redis_logger
cpp_redis_kill
cpp_redis_streams_client
cpp_redis_high_availability_client
)

foreach(EXAMPLE IN ITEMS ${EXAMPLES})
###
# executable
###
add_executable(${EXAMPLE} ${EXAMPLE}.cpp)
target_link_libraries(${EXAMPLE} cpp_redis)

###
# link libs
###
if(WIN32)
target_link_libraries(${EXAMPLE} ws2_32)
else()
target_link_libraries(${EXAMPLE} pthread)
endif(WIN32)
endforeach(EXAMPLE)
2 changes: 1 addition & 1 deletion examples/cpp_redis_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ main(void) {
}
});

auto replcmd = [](cpp_redis::reply &reply) {
auto replcmd = [](const cpp_redis::reply &reply) {
std::cout << "set hello 42: " << reply << std::endl;
// if (reply.is_string())
// do_something_with_string(reply.as_string())
Expand Down
38 changes: 32 additions & 6 deletions examples/cpp_redis_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ sigint_handler(int) {
}

int
main(void) {
main() {
#ifdef _WIN32
//! Windows netword DLL init
WORD version = MAKEWORD(2, 2);
Expand All @@ -53,7 +53,8 @@ main(void) {

//! Enable logging

const std::string group_name = "groupone";
//const std::string group_name = "groupone";
const std::vector<std::string> group_names = {"groupone"}; //, "grouptwo"};
const std::string session_name = "sessone";
const std::string consumer_name = "ABCD";

Expand All @@ -68,11 +69,36 @@ main(void) {
}
});

sub.subscribe(group_name, [](const cpp_redis::message_type msg){
std::cout << "Id in the cb: " << msg.get_id() << std::endl;
sub.auth("{redis_key}");

for (auto &group : group_names) {

sub.subscribe(group,
[group](const cpp_redis::message_type msg) {
cpp_redis::consumer_response_t res;
// Callback will run for each message obtained from the queue
std::cout << "Group: " << group << std::endl;
std::cout << "Id in the cb: " << msg.get_id() << std::endl;
res.insert({"Id", msg.get_id()});
return res;
},
[group](int ack_status) {
// Callback will run upon return of xack
std::cout << "Group: " << group << std::endl;
std::cout << "Ack status: " << ack_status << std::endl;
});
}

return msg;
});
/*sub.subscribe(group_name,
[](const cpp_redis::message_type msg) {
// Callback will run for each message obtained from the queue
std::cout << "Id in the cb: " << msg.get_id() << std::endl;
return msg;
},
[](int ack_status) {
// Callback will run upon return of xack
std::cout << "Ack status: " << ack_status << std::endl;
});*/

sub.commit();

Expand Down
2 changes: 1 addition & 1 deletion examples/cpp_redis_streams_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
#endif /* _WIN32 */

int
main(void) {
main() {
#ifdef _WIN32
//! Windows netword DLL init
WORD version = MAKEWORD(2, 2);
Expand Down
Loading

0 comments on commit 543b4d7

Please sign in to comment.