From a33070c9429145050e1846f0da4f63ea3e34dbfc Mon Sep 17 00:00:00 2001 From: Jia Ke Date: Thu, 16 May 2024 18:35:51 +0800 Subject: [PATCH] Support jvm libhdfs in velox --- .github/workflows/linux-build.yml | 11 ++++++++ CMakeLists.txt | 27 +++++++++++++++---- velox/connectors/hive/HiveConnector.cpp | 4 +-- .../hive/storage_adapters/hdfs/CMakeLists.txt | 12 ++++++++- .../storage_adapters/hdfs/HdfsFileSystem.cpp | 9 +++++++ .../storage_adapters/hdfs/HdfsReadFile.cpp | 11 ++++++++ .../hive/storage_adapters/hdfs/HdfsReadFile.h | 15 ++++++++++- .../storage_adapters/hdfs/HdfsWriteFile.cpp | 23 ++++++++++++++++ .../storage_adapters/hdfs/HdfsWriteFile.h | 5 ++++ .../hdfs/RegisterHdfsFileSystem.cpp | 6 ++--- .../hdfs/tests/CMakeLists.txt | 5 ++++ .../hdfs/tests/HdfsFileSystemTest.cpp | 4 +++ 12 files changed, 120 insertions(+), 12 deletions(-) diff --git a/.github/workflows/linux-build.yml b/.github/workflows/linux-build.yml index 8e619be32d6e..bc0c0f9c0d9f 100644 --- a/.github/workflows/linux-build.yml +++ b/.github/workflows/linux-build.yml @@ -96,6 +96,17 @@ jobs: # Without that, nvcc picks /usr/bin/c++ which is GCC 8 CUDA_FLAGS: "-ccbin /opt/rh/gcc-toolset-9/root/usr/bin" run: | + # Setting HADOOP_HOME + wget https://archive.apache.org/dist/hadoop/core/hadoop-2.10.1/hadoop-2.10.1.tar.gz + tar xf hadoop-2.10.1.tar.gz -C /usr/local/ + export HADOOP_HOME='/usr/local/hadoop-2.10.1' + echo "HADOOP_HOME: $HADOOP_HOME" + + # Setting JAVA_HOME + yum update -y && yum install -y java-1.8.0-openjdk-devel wget + export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk + echo "JAVA_HOME: $JAVA_HOME" + EXTRA_CMAKE_FLAGS=( "-DVELOX_ENABLE_BENCHMARKS=ON" "-DVELOX_ENABLE_ARROW=ON" diff --git a/CMakeLists.txt b/CMakeLists.txt index cb8f6a854a66..5edcae391cca 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -243,11 +243,28 @@ if(VELOX_ENABLE_ABFS) endif() if(VELOX_ENABLE_HDFS) - find_library( - LIBHDFS3 - NAMES libhdfs3.so libhdfs3.dylib - HINTS "${CMAKE_SOURCE_DIR}/hawq/depends/libhdfs3/_build/src/" REQUIRED) - add_definitions(-DVELOX_ENABLE_HDFS3) + if(DEFINED ENV{HADOOP_HOME} AND DEFINED ENV{JAVA_HOME}) + message(STATUS "Using HDFS environment: $ENV{HADOOP_HOME}") + message(STATUS "Using JAVA environment: $ENV{JAVA_HOME}") + set(HADOOP_HOME $ENV{HADOOP_HOME}) + set(JAVA_HOME $ENV{JAVA_HOME}) + + find_library( + LIBHDFS + NAMES libhdfs.so + HINTS "${HADOOP_HOME}/lib/native" REQUIRED) + find_library( + LIBJVM + NAMES libjvm.so + HINTS "${JAVA_HOME}/jre/lib/amd64/server/" REQUIRED) + add_definitions(-DVELOX_ENABLE_HDFS) + else() + find_library( + LIBHDFS3 + NAMES libhdfs3.so libhdfs3.dylib + HINTS "${CMAKE_SOURCE_DIR}/hawq/depends/libhdfs3/_build/src/" REQUIRED) + add_definitions(-DVELOX_ENABLE_HDFS3) + endif() endif() if(VELOX_ENABLE_PARQUET) diff --git a/velox/connectors/hive/HiveConnector.cpp b/velox/connectors/hive/HiveConnector.cpp index c693e8ee0922..a91561f800ff 100644 --- a/velox/connectors/hive/HiveConnector.cpp +++ b/velox/connectors/hive/HiveConnector.cpp @@ -25,7 +25,7 @@ #ifdef VELOX_ENABLE_GCS #include "velox/connectors/hive/storage_adapters/gcs/RegisterGCSFileSystem.h" // @manual #endif -#ifdef VELOX_ENABLE_HDFS3 +#if defined(VELOX_ENABLE_HDFS) || defined(VELOX_ENABLE_HDFS3) #include "velox/connectors/hive/storage_adapters/hdfs/RegisterHdfsFileSystem.h" // @manual #endif #ifdef VELOX_ENABLE_S3 @@ -143,7 +143,7 @@ void HiveConnectorFactory::initialize() { #ifdef VELOX_ENABLE_S3 filesystems::registerS3FileSystem(); #endif -#ifdef VELOX_ENABLE_HDFS3 +#if defined(VELOX_ENABLE_HDFS) || defined(VELOX_ENABLE_HDFS3) filesystems::registerHdfsFileSystem(); #endif #ifdef VELOX_ENABLE_GCS diff --git a/velox/connectors/hive/storage_adapters/hdfs/CMakeLists.txt b/velox/connectors/hive/storage_adapters/hdfs/CMakeLists.txt index d6363d9e71c5..36651a90853c 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/CMakeLists.txt +++ b/velox/connectors/hive/storage_adapters/hdfs/CMakeLists.txt @@ -19,8 +19,18 @@ add_library(velox_hdfs RegisterHdfsFileSystem.cpp) if(VELOX_ENABLE_HDFS) target_sources(velox_hdfs PRIVATE HdfsFileSystem.cpp HdfsReadFile.cpp HdfsWriteFile.cpp) - target_link_libraries(velox_hdfs Folly::folly ${LIBHDFS3} xsimd) + target_link_libraries(velox_hdfs Folly::folly ${LIBHDFS} ${LIBJVM} xsimd + arrow) + target_include_directories(velox_hdfs PRIVATE ${HADOOP_HOME}/include/) + if(${VELOX_BUILD_TESTING}) + add_subdirectory(tests) + endif() +endif() +if(VELOX_ENABLE_HDFS3) + target_sources(velox_hdfs PRIVATE HdfsFileSystem.cpp HdfsReadFile.cpp + HdfsWriteFile.cpp) + target_link_libraries(velox_hdfs Folly::folly ${LIBHDFS3} xsimd arrow) if(${VELOX_BUILD_TESTING}) add_subdirectory(tests) endif() diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp b/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp index af18aa32e5e9..20e4914ecf96 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp @@ -14,7 +14,11 @@ * limitations under the License. */ #include "velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h" +#ifdef VELOX_ENABLE_HDFS3 #include +#elif VELOX_ENABLE_HDFS +#include +#endif #include #include "velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h" #include "velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.h" @@ -31,12 +35,17 @@ class HdfsFileSystem::Impl { hdfsBuilderSetNameNode(builder, endpoint.host.c_str()); hdfsBuilderSetNameNodePort(builder, atoi(endpoint.port.data())); hdfsClient_ = hdfsBuilderConnect(builder); +#ifdef VELOX_ENABLE_HDFS3 hdfsFreeBuilder(builder); VELOX_CHECK_NOT_NULL( hdfsClient_, "Unable to connect to HDFS: {}, got error: {}.", endpoint.identity(), hdfsGetLastError()) +#elif VELOX_ENABLE_HDFS + VELOX_CHECK_NOT_NULL( + hdfsClient_, "Unable to connect to HDFS: {}.", endpoint.identity()) +#endif } ~Impl() { diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.cpp b/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.cpp index 9d99420c9d7e..cdd00eca850c 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.cpp +++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.cpp @@ -16,7 +16,11 @@ #include "HdfsReadFile.h" #include +#ifdef VELOX_ENABLE_HDFS3 #include +#elif VELOX_ENABLE_HDFS +#include +#endif namespace facebook::velox { @@ -24,6 +28,7 @@ HdfsReadFile::HdfsReadFile(hdfsFS hdfs, const std::string_view path) : hdfsClient_(hdfs), filePath_(path) { fileInfo_ = hdfsGetPathInfo(hdfsClient_, filePath_.data()); if (fileInfo_ == nullptr) { +#ifdef VELOX_ENABLE_HDFS3 auto error = hdfsGetLastError(); auto errMsg = fmt::format( "Unable to get file path info for file: {}. got error: {}", @@ -33,6 +38,12 @@ HdfsReadFile::HdfsReadFile(hdfsFS hdfs, const std::string_view path) VELOX_FILE_NOT_FOUND_ERROR(errMsg); } VELOX_FAIL(errMsg); +#elif VELOX_ENABLE_HDFS + + auto errMsg = + fmt::format("Unable to get file path info for file: {}", filePath_); + VELOX_FAIL(errMsg); +#endif } } diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h b/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h index 2bd94bf9c8aa..1f0b0d82de5e 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h +++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h @@ -14,7 +14,11 @@ * limitations under the License. */ +#ifdef VELOX_ENABLE_HDFS3 #include +#elif VELOX_ENABLE_HDFS +#include +#endif #include "velox/common/file/File.h" namespace facebook::velox { @@ -33,19 +37,28 @@ struct HdfsFile { void open(hdfsFS client, const std::string& path) { client_ = client; handle_ = hdfsOpenFile(client, path.data(), O_RDONLY, 0, 0, 0); +#ifdef VELOX_ENABLE_HDFS3 VELOX_CHECK_NOT_NULL( handle_, "Unable to open file {}. got error: {}", path, hdfsGetLastError()); +#elif VELOX_ENABLE_HDFS + VELOX_CHECK_NOT_NULL(handle_, "Unable to open file {}", path); +#endif } void seek(uint64_t offset) const { + auto result = hdfsSeek(client_, handle_, offset); +#ifdef VELOX_ENABLE_HDFS3 VELOX_CHECK_EQ( - hdfsSeek(client_, handle_, offset), + result, 0, "Cannot seek through HDFS file, error is : {}", std::string(hdfsGetLastError())); +#elif VELOX_ENABLE_HDFS + VELOX_CHECK_EQ(result, 0, "Cannot seek through HDFS file"); +#endif } int32_t read(char* pos, uint64_t length) const { diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.cpp b/velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.cpp index 60f98a88c972..4b3d6f4627ee 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.cpp +++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.cpp @@ -15,7 +15,11 @@ */ #include "velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.h" +#ifdef VELOX_ENABLE_HDFS3 #include +#elif VELOX_ENABLE_HDFS +#include +#endif namespace facebook::velox { HdfsWriteFile::HdfsWriteFile( @@ -38,11 +42,15 @@ HdfsWriteFile::HdfsWriteFile( bufferSize, replication, blockSize); +#ifdef VELOX_ENABLE_HDFS3 VELOX_CHECK_NOT_NULL( hdfsFile_, "Failed to open hdfs file: {}, with error: {}", filePath_, std::string(hdfsGetLastError())); +#elif VELOX_ENABLE_HDFS + VELOX_CHECK_NOT_NULL(hdfsFile_, "Failed to open hdfs file: {}", filePath_); +#endif } HdfsWriteFile::~HdfsWriteFile() { @@ -53,11 +61,15 @@ HdfsWriteFile::~HdfsWriteFile() { void HdfsWriteFile::close() { int success = hdfsCloseFile(hdfsClient_, hdfsFile_); +#ifdef VELOX_ENABLE_HDFS3 VELOX_CHECK_EQ( success, 0, "Failed to close hdfs file: {}", std::string(hdfsGetLastError())); +#elif VELOX_ENABLE_HDFS + VELOX_CHECK_EQ(success, 0, "Failed to close hdfs file."); +#endif hdfsFile_ = nullptr; } @@ -67,8 +79,12 @@ void HdfsWriteFile::flush() { "Cannot flush HDFS file because file handle is null, file path: {}", filePath_); int success = hdfsFlush(hdfsClient_, hdfsFile_); +#ifdef VELOX_ENABLE_HDFS3 VELOX_CHECK_EQ( success, 0, "Hdfs flush error: {}", std::string(hdfsGetLastError())); +#elif VELOX_ENABLE_HDFS + VELOX_CHECK_EQ(success, 0, "Hdfs flush error."); +#endif } void HdfsWriteFile::append(std::string_view data) { @@ -81,11 +97,18 @@ void HdfsWriteFile::append(std::string_view data) { filePath_); int64_t totalWrittenBytes = hdfsWrite(hdfsClient_, hdfsFile_, std::string(data).c_str(), data.size()); +#ifdef VELOX_ENABLE_HDFS3 VELOX_CHECK_EQ( totalWrittenBytes, data.size(), "Write failure in HDFSWriteFile::append {}", std::string(hdfsGetLastError())); +#elif VELOX_ENABLE_HDFS + VELOX_CHECK_EQ( + totalWrittenBytes, + data.size(), + "Write failure in HDFSWriteFile::append."); +#endif } uint64_t HdfsWriteFile::size() const { diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.h b/velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.h index 7ed1819cd61f..edb7b2d781e6 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.h +++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.h @@ -15,7 +15,12 @@ */ #pragma once +#ifdef VELOX_ENABLE_HDFS3 #include +#elif VELOX_ENABLE_HDFS +#include +#endif + #include "velox/common/file/File.h" namespace facebook::velox { diff --git a/velox/connectors/hive/storage_adapters/hdfs/RegisterHdfsFileSystem.cpp b/velox/connectors/hive/storage_adapters/hdfs/RegisterHdfsFileSystem.cpp index 47734838838f..4df266000b11 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/RegisterHdfsFileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/hdfs/RegisterHdfsFileSystem.cpp @@ -14,7 +14,7 @@ * limitations under the License. */ -#ifdef VELOX_ENABLE_HDFS3 +#if defined(VELOX_ENABLE_HDFS) || defined(VELOX_ENABLE_HDFS3) #include "folly/concurrency/ConcurrentHashMap.h" #include "velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h" @@ -25,7 +25,7 @@ namespace facebook::velox::filesystems { -#ifdef VELOX_ENABLE_HDFS3 +#if defined(VELOX_ENABLE_HDFS) || defined(VELOX_ENABLE_HDFS3) std::mutex mtx; std::function #include +#ifdef VELOX_ENABLE_HDFS3 #include +#elif VELOX_ENABLE_HDFS +#include +#endif #include #include #include "HdfsMiniCluster.h"