Skip to content

Commit

Permalink
Support jvm libhdfs in velox
Browse files Browse the repository at this point in the history
  • Loading branch information
JkSelf committed May 16, 2024
1 parent 08ffe22 commit a33070c
Show file tree
Hide file tree
Showing 12 changed files with 120 additions and 12 deletions.
11 changes: 11 additions & 0 deletions .github/workflows/linux-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,17 @@ jobs:
# Without that, nvcc picks /usr/bin/c++ which is GCC 8
CUDA_FLAGS: "-ccbin /opt/rh/gcc-toolset-9/root/usr/bin"
run: |
# Setting HADOOP_HOME
wget https://archive.apache.org/dist/hadoop/core/hadoop-2.10.1/hadoop-2.10.1.tar.gz
tar xf hadoop-2.10.1.tar.gz -C /usr/local/
export HADOOP_HOME='/usr/local/hadoop-2.10.1'
echo "HADOOP_HOME: $HADOOP_HOME"
# Setting JAVA_HOME
yum update -y && yum install -y java-1.8.0-openjdk-devel wget
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk
echo "JAVA_HOME: $JAVA_HOME"
EXTRA_CMAKE_FLAGS=(
"-DVELOX_ENABLE_BENCHMARKS=ON"
"-DVELOX_ENABLE_ARROW=ON"
Expand Down
27 changes: 22 additions & 5 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,28 @@ if(VELOX_ENABLE_ABFS)
endif()

if(VELOX_ENABLE_HDFS)
find_library(
LIBHDFS3
NAMES libhdfs3.so libhdfs3.dylib
HINTS "${CMAKE_SOURCE_DIR}/hawq/depends/libhdfs3/_build/src/" REQUIRED)
add_definitions(-DVELOX_ENABLE_HDFS3)
if(DEFINED ENV{HADOOP_HOME} AND DEFINED ENV{JAVA_HOME})
message(STATUS "Using HDFS environment: $ENV{HADOOP_HOME}")
message(STATUS "Using JAVA environment: $ENV{JAVA_HOME}")
set(HADOOP_HOME $ENV{HADOOP_HOME})
set(JAVA_HOME $ENV{JAVA_HOME})

find_library(
LIBHDFS
NAMES libhdfs.so
HINTS "${HADOOP_HOME}/lib/native" REQUIRED)
find_library(
LIBJVM
NAMES libjvm.so
HINTS "${JAVA_HOME}/jre/lib/amd64/server/" REQUIRED)
add_definitions(-DVELOX_ENABLE_HDFS)
else()
find_library(
LIBHDFS3
NAMES libhdfs3.so libhdfs3.dylib
HINTS "${CMAKE_SOURCE_DIR}/hawq/depends/libhdfs3/_build/src/" REQUIRED)
add_definitions(-DVELOX_ENABLE_HDFS3)
endif()
endif()

if(VELOX_ENABLE_PARQUET)
Expand Down
4 changes: 2 additions & 2 deletions velox/connectors/hive/HiveConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#ifdef VELOX_ENABLE_GCS
#include "velox/connectors/hive/storage_adapters/gcs/RegisterGCSFileSystem.h" // @manual
#endif
#ifdef VELOX_ENABLE_HDFS3
#if defined(VELOX_ENABLE_HDFS) || defined(VELOX_ENABLE_HDFS3)
#include "velox/connectors/hive/storage_adapters/hdfs/RegisterHdfsFileSystem.h" // @manual
#endif
#ifdef VELOX_ENABLE_S3
Expand Down Expand Up @@ -143,7 +143,7 @@ void HiveConnectorFactory::initialize() {
#ifdef VELOX_ENABLE_S3
filesystems::registerS3FileSystem();
#endif
#ifdef VELOX_ENABLE_HDFS3
#if defined(VELOX_ENABLE_HDFS) || defined(VELOX_ENABLE_HDFS3)
filesystems::registerHdfsFileSystem();
#endif
#ifdef VELOX_ENABLE_GCS
Expand Down
12 changes: 11 additions & 1 deletion velox/connectors/hive/storage_adapters/hdfs/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,18 @@ add_library(velox_hdfs RegisterHdfsFileSystem.cpp)
if(VELOX_ENABLE_HDFS)
target_sources(velox_hdfs PRIVATE HdfsFileSystem.cpp HdfsReadFile.cpp
HdfsWriteFile.cpp)
target_link_libraries(velox_hdfs Folly::folly ${LIBHDFS3} xsimd)
target_link_libraries(velox_hdfs Folly::folly ${LIBHDFS} ${LIBJVM} xsimd
arrow)
target_include_directories(velox_hdfs PRIVATE ${HADOOP_HOME}/include/)
if(${VELOX_BUILD_TESTING})
add_subdirectory(tests)
endif()
endif()

if(VELOX_ENABLE_HDFS3)
target_sources(velox_hdfs PRIVATE HdfsFileSystem.cpp HdfsReadFile.cpp
HdfsWriteFile.cpp)
target_link_libraries(velox_hdfs Folly::folly ${LIBHDFS3} xsimd arrow)
if(${VELOX_BUILD_TESTING})
add_subdirectory(tests)
endif()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@
* limitations under the License.
*/
#include "velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h"
#ifdef VELOX_ENABLE_HDFS3
#include <hdfs/hdfs.h>
#elif VELOX_ENABLE_HDFS
#include <hdfs.h>
#endif
#include <mutex>
#include "velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h"
#include "velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.h"
Expand All @@ -31,12 +35,17 @@ class HdfsFileSystem::Impl {
hdfsBuilderSetNameNode(builder, endpoint.host.c_str());
hdfsBuilderSetNameNodePort(builder, atoi(endpoint.port.data()));
hdfsClient_ = hdfsBuilderConnect(builder);
#ifdef VELOX_ENABLE_HDFS3
hdfsFreeBuilder(builder);
VELOX_CHECK_NOT_NULL(
hdfsClient_,
"Unable to connect to HDFS: {}, got error: {}.",
endpoint.identity(),
hdfsGetLastError())
#elif VELOX_ENABLE_HDFS
VELOX_CHECK_NOT_NULL(
hdfsClient_, "Unable to connect to HDFS: {}.", endpoint.identity())
#endif
}

~Impl() {
Expand Down
11 changes: 11 additions & 0 deletions velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,19 @@

#include "HdfsReadFile.h"
#include <folly/synchronization/CallOnce.h>
#ifdef VELOX_ENABLE_HDFS3
#include <hdfs/hdfs.h>
#elif VELOX_ENABLE_HDFS
#include <hdfs.h>
#endif

namespace facebook::velox {

HdfsReadFile::HdfsReadFile(hdfsFS hdfs, const std::string_view path)
: hdfsClient_(hdfs), filePath_(path) {
fileInfo_ = hdfsGetPathInfo(hdfsClient_, filePath_.data());
if (fileInfo_ == nullptr) {
#ifdef VELOX_ENABLE_HDFS3
auto error = hdfsGetLastError();
auto errMsg = fmt::format(
"Unable to get file path info for file: {}. got error: {}",
Expand All @@ -33,6 +38,12 @@ HdfsReadFile::HdfsReadFile(hdfsFS hdfs, const std::string_view path)
VELOX_FILE_NOT_FOUND_ERROR(errMsg);
}
VELOX_FAIL(errMsg);
#elif VELOX_ENABLE_HDFS

auto errMsg =
fmt::format("Unable to get file path info for file: {}", filePath_);
VELOX_FAIL(errMsg);
#endif
}
}

Expand Down
15 changes: 14 additions & 1 deletion velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@
* limitations under the License.
*/

#ifdef VELOX_ENABLE_HDFS3
#include <hdfs/hdfs.h>
#elif VELOX_ENABLE_HDFS
#include <hdfs.h>
#endif
#include "velox/common/file/File.h"

namespace facebook::velox {
Expand All @@ -33,19 +37,28 @@ struct HdfsFile {
void open(hdfsFS client, const std::string& path) {
client_ = client;
handle_ = hdfsOpenFile(client, path.data(), O_RDONLY, 0, 0, 0);
#ifdef VELOX_ENABLE_HDFS3
VELOX_CHECK_NOT_NULL(
handle_,
"Unable to open file {}. got error: {}",
path,
hdfsGetLastError());
#elif VELOX_ENABLE_HDFS
VELOX_CHECK_NOT_NULL(handle_, "Unable to open file {}", path);
#endif
}

void seek(uint64_t offset) const {
auto result = hdfsSeek(client_, handle_, offset);
#ifdef VELOX_ENABLE_HDFS3
VELOX_CHECK_EQ(
hdfsSeek(client_, handle_, offset),
result,
0,
"Cannot seek through HDFS file, error is : {}",
std::string(hdfsGetLastError()));
#elif VELOX_ENABLE_HDFS
VELOX_CHECK_EQ(result, 0, "Cannot seek through HDFS file");
#endif
}

int32_t read(char* pos, uint64_t length) const {
Expand Down
23 changes: 23 additions & 0 deletions velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
*/

#include "velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.h"
#ifdef VELOX_ENABLE_HDFS3
#include <hdfs/hdfs.h>
#elif VELOX_ENABLE_HDFS
#include <hdfs.h>
#endif

namespace facebook::velox {
HdfsWriteFile::HdfsWriteFile(
Expand All @@ -38,11 +42,15 @@ HdfsWriteFile::HdfsWriteFile(
bufferSize,
replication,
blockSize);
#ifdef VELOX_ENABLE_HDFS3
VELOX_CHECK_NOT_NULL(
hdfsFile_,
"Failed to open hdfs file: {}, with error: {}",
filePath_,
std::string(hdfsGetLastError()));
#elif VELOX_ENABLE_HDFS
VELOX_CHECK_NOT_NULL(hdfsFile_, "Failed to open hdfs file: {}", filePath_);
#endif
}

HdfsWriteFile::~HdfsWriteFile() {
Expand All @@ -53,11 +61,15 @@ HdfsWriteFile::~HdfsWriteFile() {

void HdfsWriteFile::close() {
int success = hdfsCloseFile(hdfsClient_, hdfsFile_);
#ifdef VELOX_ENABLE_HDFS3
VELOX_CHECK_EQ(
success,
0,
"Failed to close hdfs file: {}",
std::string(hdfsGetLastError()));
#elif VELOX_ENABLE_HDFS
VELOX_CHECK_EQ(success, 0, "Failed to close hdfs file.");
#endif
hdfsFile_ = nullptr;
}

Expand All @@ -67,8 +79,12 @@ void HdfsWriteFile::flush() {
"Cannot flush HDFS file because file handle is null, file path: {}",
filePath_);
int success = hdfsFlush(hdfsClient_, hdfsFile_);
#ifdef VELOX_ENABLE_HDFS3
VELOX_CHECK_EQ(
success, 0, "Hdfs flush error: {}", std::string(hdfsGetLastError()));
#elif VELOX_ENABLE_HDFS
VELOX_CHECK_EQ(success, 0, "Hdfs flush error.");
#endif
}

void HdfsWriteFile::append(std::string_view data) {
Expand All @@ -81,11 +97,18 @@ void HdfsWriteFile::append(std::string_view data) {
filePath_);
int64_t totalWrittenBytes =
hdfsWrite(hdfsClient_, hdfsFile_, std::string(data).c_str(), data.size());
#ifdef VELOX_ENABLE_HDFS3
VELOX_CHECK_EQ(
totalWrittenBytes,
data.size(),
"Write failure in HDFSWriteFile::append {}",
std::string(hdfsGetLastError()));
#elif VELOX_ENABLE_HDFS
VELOX_CHECK_EQ(
totalWrittenBytes,
data.size(),
"Write failure in HDFSWriteFile::append.");
#endif
}

uint64_t HdfsWriteFile::size() const {
Expand Down
5 changes: 5 additions & 0 deletions velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@
*/
#pragma once

#ifdef VELOX_ENABLE_HDFS3
#include <hdfs/hdfs.h>
#elif VELOX_ENABLE_HDFS
#include <hdfs.h>
#endif

#include "velox/common/file/File.h"

namespace facebook::velox {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

#ifdef VELOX_ENABLE_HDFS3
#if defined(VELOX_ENABLE_HDFS) || defined(VELOX_ENABLE_HDFS3)
#include "folly/concurrency/ConcurrentHashMap.h"

#include "velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h"
Expand All @@ -25,7 +25,7 @@

namespace facebook::velox::filesystems {

#ifdef VELOX_ENABLE_HDFS3
#if defined(VELOX_ENABLE_HDFS) || defined(VELOX_ENABLE_HDFS3)
std::mutex mtx;

std::function<std::shared_ptr<
Expand Down Expand Up @@ -95,7 +95,7 @@ hdfsWriteFileSinkGenerator() {
#endif

void registerHdfsFileSystem() {
#ifdef VELOX_ENABLE_HDFS3
#if defined(VELOX_ENABLE_HDFS) || defined(VELOX_ENABLE_HDFS3)
registerFileSystem(HdfsFileSystem::isHdfsFile, hdfsFileSystemGenerator());
dwio::common::FileSink::registerFactory(hdfsWriteFileSinkGenerator());
#endif
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,10 @@ target_link_libraries(
gtest_main
gmock)

if(VELOX_ENABLE_HDFS)
target_include_directories(velox_hdfs_file_test
PRIVATE ${HADOOP_HOME}/include/)
endif()

target_compile_options(velox_hdfs_file_test
PRIVATE -Wno-deprecated-declarations)
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@
#include "velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h"
#include <boost/format.hpp>
#include <gmock/gmock-matchers.h>
#ifdef VELOX_ENABLE_HDFS3
#include <hdfs/hdfs.h>
#elif VELOX_ENABLE_HDFS
#include <hdfs.h>
#endif
#include <atomic>
#include <random>
#include "HdfsMiniCluster.h"
Expand Down

0 comments on commit a33070c

Please sign in to comment.