Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ if(WIN32)
add_definitions(-DNDEBUG)
endif()

set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /EHsc /MP /bigobj")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /EHsc /MP /bigobj /utf-8")
foreach(warning 4244 4251 4267 4275 4290 4786 4305 4996)
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /wd${warning}")
endforeach(warning)
Expand Down Expand Up @@ -258,7 +258,11 @@ if(CSP_BUILD_ARROW_ADAPTER)
endif()

if(CSP_BUILD_KAFKA_ADAPTER)
find_package(DepsKafkaAdapter REQUIRED)
find_package(DepsKafkaAdapter)
if(NOT DepsKafkaAdapter_FOUND)
message(STATUS "Kafka adapter disabled - dependencies not available")
set(CSP_BUILD_KAFKA_ADAPTER OFF)
endif()
endif()

if(CSP_BUILD_PARQUET_ADAPTER)
Expand Down
3,139 changes: 2,713 additions & 426 deletions NOTICE

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions conda/dev-environment-unix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ channels:
- nodefaults
dependencies:
- astor
- avrocpp
- bison
- brotli
- bump-my-version
Expand Down
3 changes: 2 additions & 1 deletion conda/dev-environment-win.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ channels:
- conda-forge
- nodefaults
dependencies:
# - bison # not available on windows
- astor
- avrocpp
# - bison # not available on windows
- brotli
- bump-my-version
- cmake
Expand Down
51 changes: 51 additions & 0 deletions cpp/cmake/modules/FindAvro.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
find_path(Avro_INCLUDE_DIR NAMES avro/Encoder.hh)
find_library(Avro_LIBRARY NAMES avrocpp libavrocpp)

# =============================================================================
# Check for conda-forge avro-cpp fmt::formatter incompatibility on Windows
# =============================================================================
# conda-forge's avro-cpp has fmt::formatter specializations with non-const
# format() methods, but fmt v12+ requires const. This causes MSVC error C2766.
#
# If detected, Avro_FOUND is set to FALSE and Kafka adapter will be disabled.
# =============================================================================

set(Avro_COMPATIBLE TRUE)

if(WIN32 AND Avro_INCLUDE_DIR AND NOT CSP_USE_VCPKG)
set(_avro_node_hh "${Avro_INCLUDE_DIR}/avro/Node.hh")
if(EXISTS "${_avro_node_hh}")
file(READ "${_avro_node_hh}" _node_hh_content)
string(FIND "${_node_hh_content}" "fmt::formatter<avro::Name>" _has_formatter)
if(NOT _has_formatter EQUAL -1)
# Check for non-const format() - the bug pattern
string(REGEX MATCH "auto format\\([^)]+\\)[^c]*\\{" _buggy_pattern "${_node_hh_content}")
if(_buggy_pattern)
set(Avro_COMPATIBLE FALSE)
message(WARNING
"avro-cpp has incompatible fmt::formatter (non-const format()). "
"Kafka adapter will be disabled. Update avro-cpp when conda-forge releases a fix.")
endif()
endif()
endif()
endif()

if(Avro_COMPATIBLE AND Avro_INCLUDE_DIR AND Avro_LIBRARY)
if(NOT TARGET Avro::avrocpp)
add_library(Avro::avrocpp SHARED IMPORTED)
if(WIN32)
set_property(TARGET Avro::avrocpp PROPERTY IMPORTED_IMPLIB "${Avro_LIBRARY}")
else()
set_property(TARGET Avro::avrocpp PROPERTY IMPORTED_LOCATION "${Avro_LIBRARY}")
endif()
target_include_directories(Avro::avrocpp INTERFACE ${Avro_INCLUDE_DIR})
endif()
endif()

include(FindPackageHandleStandardArgs)
if(Avro_COMPATIBLE)
find_package_handle_standard_args(Avro DEFAULT_MSG Avro_LIBRARY Avro_INCLUDE_DIR)
else()
set(Avro_FOUND FALSE)
endif()
mark_as_advanced(Avro_INCLUDE_DIR Avro_LIBRARY)
23 changes: 22 additions & 1 deletion cpp/cmake/modules/FindDepsKafkaAdapter.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,32 @@ cmake_minimum_required(VERSION 3.7.2)

if (CSP_USE_VCPKG)
find_package(RdKafka CONFIG REQUIRED)
if(NOT WIN32)
find_package(unofficial-avro-cpp CONFIG REQUIRED)

# Check avro-cpp version on Windows - require >= 1.12.1 for fmt v12 compatibility
# avro-cpp 1.12.0 has fmt::formatter with non-const format() which causes MSVC C2766
if(WIN32 AND unofficial-avro-cpp_VERSION VERSION_LESS "1.12.1")
message(WARNING
"avro-cpp ${unofficial-avro-cpp_VERSION} has incompatible fmt::formatter on Windows. "
"Kafka adapter will be disabled. Update vcpkg to get avro-cpp >= 1.12.1.")
set(DepsKafkaAdapter_FOUND FALSE)
return()
endif()

if(NOT WIN32)
# Bad, but a temporary workaround for
# https://github.com/microsoft/vcpkg/issues/40320
link_directories(${VCPKG_INSTALLED_DIR}/${VCPKG_TARGET_TRIPLET}/lib)
endif()
set(CSP_AVRO_TARGET unofficial::avro-cpp::avrocpp CACHE INTERNAL "")
set(DepsKafkaAdapter_FOUND TRUE)
else()
find_package(RdKafka REQUIRED)
find_package(Avro)
if(NOT Avro_FOUND)
set(DepsKafkaAdapter_FOUND FALSE)
else()
set(CSP_AVRO_TARGET Avro::avrocpp CACHE INTERNAL "")
set(DepsKafkaAdapter_FOUND TRUE)
endif()
endif()
2 changes: 2 additions & 0 deletions cpp/csp/adapters/kafka/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ find_package(DepsKafkaAdapter REQUIRED)

target_link_libraries(csp_kafka_adapter PUBLIC csp_adapter_utils RdKafka::rdkafka RdKafka::rdkafka++)

target_link_libraries(csp_kafka_adapter PUBLIC ${CSP_AVRO_TARGET})

install(TARGETS csp_kafka_adapter
PUBLIC_HEADER DESTINATION include/csp/adapters/kafka
RUNTIME DESTINATION ${CSP_RUNTIME_INSTALL_SUBDIR}
Expand Down
3 changes: 3 additions & 0 deletions cpp/csp/adapters/kafka/KafkaPublisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <csp/adapters/kafka/KafkaPublisher.h>
#include <csp/adapters/utils/MessageWriter.h>
#include <csp/adapters/utils/JSONMessageWriter.h>
#include <csp/adapters/utils/AvroMessageWriter.h>

#include <librdkafka/rdkafkacpp.h>

Expand All @@ -17,6 +18,8 @@ KafkaPublisher::KafkaPublisher( KafkaAdapterManager * mgr, const Dictionary & pr
auto protocol = properties.get<std::string>( "protocol" );
if( protocol == "JSON" )
m_msgWriter = std::make_shared<utils::JSONMessageWriter>( properties );
else if( protocol == "AVRO" )
m_msgWriter = std::make_shared<utils::AvroMessageWriter>( properties );
else if( protocol != "RAW_BYTES" )
CSP_THROW( NotImplemented, "msg protocol " << protocol << " not currently supported for kafka output adapters" );
}
Expand Down
16 changes: 16 additions & 0 deletions cpp/csp/adapters/utils/AvroIncludes.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#ifndef _IN_CSP_ADAPTERS_UTILS_AVROINCLUDES_H
#define _IN_CSP_ADAPTERS_UTILS_AVROINCLUDES_H

// Centralized avro includes.

#include <avro/Compiler.hh>
#include <avro/Decoder.hh>
#include <avro/Encoder.hh>
#include <avro/Generic.hh>
#include <avro/GenericDatum.hh>
#include <avro/Node.hh>
#include <avro/Schema.hh>
#include <avro/Stream.hh>
#include <avro/ValidSchema.hh>

#endif
Loading
Loading