Skip to content

Commit

Permalink
[Feature] Add native format writer to access StarRocks data bypass BE…
Browse files Browse the repository at this point in the history
… Server.

Signed-off-by: plotor <zhenchao.wang@hotmail.com>
  • Loading branch information
plotor committed Nov 7, 2024
1 parent 3b8e683 commit 1c009e6
Show file tree
Hide file tree
Showing 38 changed files with 588 additions and 123 deletions.
40 changes: 23 additions & 17 deletions be/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -859,7 +859,7 @@ set(STARROCKS_DEPENDENCIES ${STARROCKS_DEPENDENCIES}
set_target_properties(aws-cpp-sdk-core PROPERTIES INTERFACE_LINK_LIBRARIES AWS::aws-crt-cpp)

if (STARROCKS_JIT_ENABLE)
set(STARROCKS_DEPENDENCIES
set(STARROCKS_DEPENDENCIES
${STARROCKS_DEPENDENCIES}
${WL_START_GROUP}
${LLVM_LIBRARIES}
Expand Down Expand Up @@ -909,7 +909,7 @@ set(STARROCKS_DEPENDENCIES
arrow_flight
gRPC::grpc
gRPC::grpc++
jemalloc # required by arrow
# jemalloc # required by arrow FIXME by zhenchao any problems 2024-10-30 16:14:24
parquet
orc
cctz
Expand Down Expand Up @@ -967,9 +967,15 @@ endif()

# Add all external dependencies. They should come after the starrocks libs.

set(STARROCKS_STD_DEPENDENCIES
-static-libstdc++
-static-libgcc
-lresolv
)

set(STARROCKS_LINK_LIBS ${STARROCKS_LINK_LIBS}
${STARROCKS_LINK_LIBS}
${STARROCKS_DEPENDENCIES}
${STARROCKS_STD_DEPENDENCIES}
hdfs
jvm
)
Expand All @@ -978,42 +984,42 @@ set(BUILD_FOR_SANITIZE "OFF")
# Add sanitize static link flags or jemalloc
if ("${CMAKE_BUILD_TYPE}" STREQUAL "DEBUG" OR "${CMAKE_BUILD_TYPE}" STREQUAL "RELEASE")
message("use jemalloc")
set(STARROCKS_MEMORY_DEPENDENCIES_LIBS jemalloc)
set(STARROCKS_LINK_LIBS ${STARROCKS_LINK_LIBS} jemalloc)
elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "ASAN")
if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
set(STARROCKS_MEMORY_DEPENDENCIES_LIBS -static-libsan)
set(STARROCKS_LINK_LIBS ${STARROCKS_LINK_LIBS} -static-libsan)
else()
set(STARROCKS_MEMORY_DEPENDENCIES_LIBS -static-libasan)
set(STARROCKS_LINK_LIBS ${STARROCKS_LINK_LIBS} -static-libasan)
endif()
set(BUILD_FOR_SANITIZE "ON")
elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "LSAN")
set(STARROCKS_MEMORY_DEPENDENCIES_LIBS -static-liblsan)
set(BUILD_FOR_SANITIZE "ON")
set(STARROCKS_LINK_LIBS ${STARROCKS_LINK_LIBS} -static-liblsan)
set(BUILD_FOR_SANITIZE "ON")
elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "UBSAN")
message("use jemalloc")
set(STARROCKS_MEMORY_DEPENDENCIES_LIBS -static-libubsan jemalloc)
set(STARROCKS_LINK_LIBS ${STARROCKS_LINK_LIBS} -static-libubsan jemalloc)
set(BUILD_FOR_SANITIZE "ON")
elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "TSAN")
set(STARROCKS_MEMORY_DEPENDENCIES_LIBS -static-libtsan)
set(STARROCKS_LINK_LIBS ${STARROCKS_LINK_LIBS} -static-libtsan)
set(BUILD_FOR_SANITIZE "ON")
else()
message(FATAL_ERROR "Unknown build type: ${CMAKE_BUILD_TYPE}")
endif()

if (NOT BUILD_FORMAT_LIB)
# skip the STARROCKS_MEMORY_DEPENDENCIES_LIBS only when BUILD_FORMAT_LIB=ON
set(STARROCKS_LINK_LIBS ${STARROCKS_LINK_LIBS} ${STARROCKS_MEMORY_DEPENDENCIES_LIBS})
endif()

if (NOT ("${MAKE_TEST}" STREQUAL "ON" AND "${BUILD_FOR_SANITIZE}" STREQUAL "ON"))
# In other words, turn to dynamic link when MAKE_TEST and BUILD_TYPE == *SAN
# otherwise do static link gcc's lib
set(STARROCKS_LINK_LIBS ${STARROCKS_LINK_LIBS} -static-libstdc++ -static-libgcc)
endif()

set(STARROCKS_DYNAMIC_DEPENDENCIES_LIBS
-lbfd -liberty -lc -lm -ldl -rdynamic -pthread -Wl,-wrap=__cxa_throw
)

set(STARROCKS_LINK_LIBS ${STARROCKS_LINK_LIBS}
${WL_LINK_STATIC} -lbfd
${WL_LINK_DYNAMIC} -lresolv -liberty -lc -lm -ldl -rdynamic -pthread -Wl,-wrap=__cxa_throw
${WL_LINK_STATIC}
${WL_LINK_DYNAMIC}
${STARROCKS_DYNAMIC_DEPENDENCIES_LIBS}
)

# link gcov if WITH_GCOV is on
Expand Down
4 changes: 4 additions & 0 deletions be/src/column/field.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ class Field {
_name(rhs._name),
_type(rhs._type),
_sub_fields(rhs._sub_fields ? new std::vector<Field>(*rhs._sub_fields) : nullptr),
_length(rhs._length),
_short_key_length(rhs._short_key_length),
_flags(rhs._flags),
_uid(rhs._uid) {}
Expand All @@ -95,6 +96,7 @@ class Field {
_name(std::move(rhs._name)),
_type(std::move(rhs._type)),
_sub_fields(rhs._sub_fields),
_length(rhs._length),
_short_key_length(rhs._short_key_length),
_flags(rhs._flags),
_uid(rhs._uid) {
Expand All @@ -108,6 +110,7 @@ class Field {
_name = rhs._name;
_type = rhs._type;
_agg_method = rhs._agg_method;
_length = rhs._length;
_agg_state_desc = rhs._agg_state_desc;
_short_key_length = rhs._short_key_length;
_flags = rhs._flags;
Expand All @@ -123,6 +126,7 @@ class Field {
_name = std::move(rhs._name);
_type = std::move(rhs._type);
_agg_method = rhs._agg_method;
_length = rhs._length;
_agg_state_desc = rhs._agg_state_desc;
_short_key_length = rhs._short_key_length;
_flags = rhs._flags;
Expand Down
4 changes: 3 additions & 1 deletion be/src/exec/olap_scan_prepare.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,9 @@ Status ChunkPredicateBuilder<E, Type>::normalize_predicate(const SlotDescriptor&
RETURN_IF_ERROR((normalize_not_in_or_not_equal_predicate<SlotType, RangeValueType, Negative>(slot, range)));
RETURN_IF_ERROR(normalize_is_null_predicate(slot));
// Must handle join runtime filter last
RETURN_IF_ERROR((normalize_join_runtime_filter<SlotType, RangeValueType, Negative>(slot, range)));
if (nullptr != _opts.runtime_filters) {
RETURN_IF_ERROR((normalize_join_runtime_filter<SlotType, RangeValueType, Negative>(slot, range)));
}

return Status::OK();
}
Expand Down
4 changes: 4 additions & 0 deletions be/src/exprs/function_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ FunctionContext* FunctionContext::create_context(RuntimeState* state, MemPool* p
ctx->_mem_pool = pool;
ctx->_return_type = return_type;
ctx->_arg_types = arg_types;
#if !defined(BUILD_FORMAT_LIB)
ctx->_jvm_udaf_ctxs = std::make_unique<JavaUDAFContext>();
#endif
return ctx;
}

Expand All @@ -52,7 +54,9 @@ FunctionContext* FunctionContext::create_context(RuntimeState* state, MemPool* p
ctx->_mem_pool = pool;
ctx->_return_type = return_type;
ctx->_arg_types = arg_types;
#if !defined(BUILD_FORMAT_LIB)
ctx->_jvm_udaf_ctxs = std::make_unique<JavaUDAFContext>();
#endif
ctx->_is_distinct = is_distinct;
ctx->_is_asc_order = is_asc_order;
ctx->_nulls_first = nulls_first;
Expand Down
26 changes: 9 additions & 17 deletions be/src/starrocks_format/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.


set(CMAKE_VERBOSE_MAKEFILE ON)

set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/src/starrocks_format")
Expand Down Expand Up @@ -49,34 +48,27 @@ set(STARROCKS_LIBS
add_library(hdfs_so SHARED IMPORTED GLOBAL)
set_target_properties(hdfs_so PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/hadoop/lib/native/libhdfs.so)

set(STARROCKS_THIRDPARTY_DEPENDENCIES
SET(STARROCKS_FORMAT_LIBS
${STARROCKS_LIBS}
${STARROCKS_DEPENDENCIES}
${STARROCKS_STD_DEPENDENCIES}
${STARROCKS_MEMORY_DEPENDENCIES_LIBS}
${STARROCKS_DYNAMIC_DEPENDENCIES_LIBS}
hdfs_so
)
message(STATUS "STARROCKS_FORMAT_LIBS is ${STARROCKS_FORMAT_LIBS}")

# only build starrocks_be when TEST is off
if (NOT ${MAKE_TEST} STREQUAL "ON")
if(NOT ${MAKE_TEST} STREQUAL "ON")

add_library(starrocks_format SHARED
starrocks_lib.cpp
)
starrocks_lib.cpp
)

# This permits libraries loaded by dlopen to link to the symbols in the program.
target_link_libraries(starrocks_format
# -Wl,--whole-archive
${STARROCKS_LIBS}
# -Wl,--no-whole-archive
${STARROCKS_DEPENDENCIES}
${STARROCKS_STD_DEPENDENCIES}
${STARROCKS_DYNAMIC_DEPENDENCIES_LIBS}
hdfs_so
)
target_link_libraries(starrocks_format ${STARROCKS_FORMAT_LIBS})

install(DIRECTORY DESTINATION ${OUTPUT_DIR}/format-lib/)

install(TARGETS starrocks_format
DESTINATION ${OUTPUT_DIR}/format-lib/)
install(TARGETS starrocks_format DESTINATION ${OUTPUT_DIR}/format-lib/)

endif()
27 changes: 6 additions & 21 deletions be/src/starrocks_format/starrocks_lib.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,14 @@
// limitations under the License.

#include <aws/core/Aws.h>
#include <aws/core/client/ClientConfiguration.h>
#include <glog/logging.h>
#include <unistd.h>

#include <filesystem>
#include <fstream>

#include "common/config.h"
#include "fs/fs_s3.h"
#include "runtime/time_types.h"
#include "storage/lake/fixed_location_provider.h"
#include "storage/lake/tablet_manager.h"
#include "storage/olap_define.h"
#include "util/timezone_utils.h"
Expand All @@ -33,24 +30,21 @@ namespace starrocks::lake {
static bool _starrocks_format_inited = false;
Aws::SDKOptions aws_sdk_options;

lake::TabletManager* _lake_tablet_manager = nullptr;

void starrocks_format_initialize(void) {
setenv("STARROCKS_HOME", "./", 0);
setenv("UDF_RUNTIME_DIR", "./", 0);

if (!_starrocks_format_inited) {
fprintf(stderr, "starrocks format module start to initialize\n");
// load config file
std::string conffile = std::filesystem::current_path();
conffile += "/starrocks.conf";
const char* config_file_path = conffile.c_str();
std::string conf_file = std::filesystem::current_path();
conf_file += "/starrocks.conf";
const char* config_file_path = conf_file.c_str();
std::ifstream ifs(config_file_path);
if (!ifs.good()) {
config_file_path = nullptr;
}
if (!starrocks::config::init(config_file_path)) {
LOG(WARNING) << "read config file:" << config_file_path << " failed!";
LOG(WARNING) << "load config file " << config_file_path << " failed!";
return;
}

Expand All @@ -60,24 +54,15 @@ void starrocks_format_initialize(void) {

TimezoneUtils::init_time_zones();

auto lake_location_provider = std::make_shared<FixedLocationProvider>("");
_lake_tablet_manager = new lake::TabletManager(lake_location_provider, config::lake_metadata_cache_limit);
LOG(INFO) << "starrocks format module has been initialized successfully";
LOG(INFO) << "init starrocks format module successfully";
_starrocks_format_inited = true;
} else {
LOG(INFO) << "starrocks format module has already been initialized";
}
}

void starrocks_format_shutdown(void) {
if (_starrocks_format_inited) {
LOG(INFO) << "starrocks format module start to deinitialize";
Aws::ShutdownAPI(aws_sdk_options);
SAFE_DELETE(_lake_tablet_manager);
// SAFE_DELETE(_lake_update_manager);
LOG(INFO) << "starrocks format module has been deinitialized successfully";
} else {
LOG(INFO) << "starrocks format module has already been deinitialized";
LOG(INFO) << "shutdown starrocks format module successfully";
}
}

Expand Down
2 changes: 0 additions & 2 deletions be/src/starrocks_format/starrocks_lib.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

namespace starrocks::lake {

extern lake::TabletManager* _lake_tablet_manager;

void starrocks_format_initialize(void);

void starrocks_format_shutdown(void);
Expand Down
10 changes: 10 additions & 0 deletions be/src/types/date_value.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

#include "gutil/strings/substitute.h"
#include "types/timestamp_value.h"
#include "date_value.h"


namespace starrocks {

Expand All @@ -41,6 +43,14 @@ int32_t DateValue::to_date_literal() const {
return year * 10000 + month * 100 + day;
}

// return milliseconds since UNIX epoch.
int64_t DateValue::to_unixtime() const {
int64_t result = (int64_t)_julian * SECS_PER_DAY;
result -= timestamp::UNIX_EPOCH_SECONDS;
result *= 1000L;
return result;
}

void DateValue::from_date_literal(int64_t date_literal) {
_julian = date::from_date_literal(date_literal);
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/types/date_value.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class DateValue {

int32_t to_date_literal() const;

int64_t to_unixtime() const;

void from_date_literal(int64_t date_literal);

bool from_date_literal_with_check(int64_t date_literal);
Expand Down
7 changes: 7 additions & 0 deletions be/src/types/timestamp_value.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,13 @@ int64_t TimestampValue::to_unixtime() const {
return result;
}

int64_t TimestampValue::to_unixtime(const cctz::time_zone &ctz) const {
int64_t offset = TimezoneUtils::to_utc_offset(ctz);
int64_t result = to_unixtime();
result -= offset * MILLIS_PER_SEC;
return result;
}

bool TimestampValue::from_unixtime(int64_t second, const std::string& timezone) {
cctz::time_zone ctz;
if (!TimezoneUtils::find_cctz_time_zone(timezone, ctz)) {
Expand Down
1 change: 1 addition & 0 deletions be/src/types/timestamp_value.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ class TimestampValue {

int64_t to_unix_second() const;
int64_t to_unixtime() const;
int64_t to_unixtime(const cctz::time_zone& ctz) const;

bool from_unixtime(int64_t second, const std::string& timezone);
void from_unixtime(int64_t second, const cctz::time_zone& ctz);
Expand Down
4 changes: 3 additions & 1 deletion build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ if [ ${BUILD_BE} -eq 1 ] && [ ${BUILD_FORMAT_LIB} -eq 1 ]; then
exit 1
fi
if [ ${BUILD_FORMAT_LIB} -eq 1 ]; then
echo "do not build java extendsions when build format-lib."
echo "do not build java extensions when build format-lib."
BUILD_JAVA_EXT=OFF
fi

Expand Down Expand Up @@ -373,9 +373,11 @@ if [ ${BUILD_BE} -eq 1 ] || [ ${BUILD_FORMAT_LIB} -eq 1 ] ; then
echo "Error: cmake is not found"
exit 1
fi

# When build starrocks format lib, USE_STAROS must be ON
if [ ${BUILD_FORMAT_LIB} -eq 1 ] ; then
USE_STAROS=ON
WITH_TENANN=OFF # FIXME by zhenchao, must be OFF?
fi

CMAKE_BUILD_TYPE=$BUILD_TYPE
Expand Down
2 changes: 1 addition & 1 deletion fe/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ https://github.com/checkstyle/checkstyle/blob/master/src/main/resources/google_c
<property name="fileExtensions" value="java"/>

<module name="RegexpHeader">
<property name="headerFile" value="checkstyle-header.txt"/>
<property name="headerFile" value="${checkstyle.header.file}"/>
<property name="fileExtensions" value="java"/>
</module>

Expand Down
3 changes: 2 additions & 1 deletion fe/fe-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ under the License.
</dependency>
</dependencies>
<configuration>
<configLocation>checkstyle.xml</configLocation>
<configLocation>${project.basedir}/../checkstyle.xml</configLocation>
<headerLocation>${project.basedir}/../checkstyle-header.txt</headerLocation>
<encoding>UTF-8</encoding>
<consoleOutput>true</consoleOutput>
<failsOnError>true</failsOnError>
Expand Down
3 changes: 2 additions & 1 deletion fe/fe-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1358,7 +1358,8 @@ under the License.
</dependency>
</dependencies>
<configuration>
<configLocation>checkstyle.xml</configLocation>
<configLocation>${project.basedir}/../checkstyle.xml</configLocation>
<headerLocation>${project.basedir}/../checkstyle-header.txt</headerLocation>
<encoding>UTF-8</encoding>
<consoleOutput>true</consoleOutput>
<failsOnError>true</failsOnError>
Expand Down
Loading

0 comments on commit 1c009e6

Please sign in to comment.