Skip to content

Commit

Permalink
Add jvm version libhdfs support
Browse files Browse the repository at this point in the history
  • Loading branch information
JkSelf committed Jun 20, 2024
1 parent 1a50a8a commit bc84e4f
Show file tree
Hide file tree
Showing 11 changed files with 2,100 additions and 97 deletions.
4 changes: 0 additions & 4 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,6 @@ if(VELOX_ENABLE_ABFS)
endif()

if(VELOX_ENABLE_HDFS)
find_library(
LIBHDFS3
NAMES libhdfs3.so libhdfs3.dylib
HINTS "${CMAKE_SOURCE_DIR}/hawq/depends/libhdfs3/_build/src/" REQUIRED)
add_definitions(-DVELOX_ENABLE_HDFS3)
endif()

Expand Down
4 changes: 2 additions & 2 deletions velox/connectors/hive/storage_adapters/hdfs/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ add_library(velox_hdfs RegisterHdfsFileSystem.cpp)

if(VELOX_ENABLE_HDFS)
target_sources(velox_hdfs PRIVATE HdfsFileSystem.cpp HdfsReadFile.cpp
HdfsWriteFile.cpp)
target_link_libraries(velox_hdfs Folly::folly ${LIBHDFS3} xsimd)
HdfsWriteFile.cpp hdfs_internal.cc)
target_link_libraries(velox_hdfs Folly::folly xsimd arrow)

if(${VELOX_BUILD_TESTING})
add_subdirectory(tests)
Expand Down
38 changes: 26 additions & 12 deletions velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@
* limitations under the License.
*/
#include "velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h"
#include <hdfs/hdfs.h>
#include <arrow/status.h>
#include <mutex>
#include "velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h"
#include "velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.h"
#include "velox/connectors/hive/storage_adapters/hdfs/hdfs_internal.h"
#include "velox/core/Config.h"

namespace facebook::velox::filesystems {
Expand All @@ -27,21 +28,27 @@ class HdfsFileSystem::Impl {
public:
// Keep config here for possible use in the future.
explicit Impl(const Config* config, const HdfsServiceEndpoint& endpoint) {
auto builder = hdfsNewBuilder();
hdfsBuilderSetNameNode(builder, endpoint.host.c_str());
hdfsBuilderSetNameNodePort(builder, atoi(endpoint.port.data()));
hdfsClient_ = hdfsBuilderConnect(builder);
hdfsFreeBuilder(builder);
auto status = ConnectLibHdfs(&driver_);
if (!status.ok()) {
LOG(ERROR) << "ConnectLibHdfs failed ";
}

// connect to HDFS with the builder object
hdfsBuilder* builder = driver_->NewBuilder();
driver_->BuilderSetNameNode(builder, endpoint.host.c_str());
driver_->BuilderSetNameNodePort(builder, atoi(endpoint.port.data()));
driver_->BuilderSetForceNewInstance(builder);
hdfsClient_ = driver_->BuilderConnect(builder);
VELOX_CHECK_NOT_NULL(
hdfsClient_,
"Unable to connect to HDFS: {}, got error: {}.",
endpoint.identity(),
hdfsGetLastError())
"Unable to connect to HDFS: {}, got error",
endpoint.identity())
}

~Impl() {
LOG(INFO) << "Disconnecting HDFS file system";
int disconnectResult = hdfsDisconnect(hdfsClient_);
int disconnectResult = driver_->Disconnect(hdfsClient_);
;
if (disconnectResult != 0) {
LOG(WARNING) << "hdfs disconnect failure in HdfsReadFile close: "
<< errno;
Expand All @@ -52,8 +59,13 @@ class HdfsFileSystem::Impl {
return hdfsClient_;
}

arrow::io::internal::LibHdfsShim* hdfsShim() {
return driver_;
}

private:
hdfsFS hdfsClient_;
arrow::io::internal::LibHdfsShim* driver_;
};

HdfsFileSystem::HdfsFileSystem(
Expand All @@ -77,13 +89,15 @@ std::unique_ptr<ReadFile> HdfsFileSystem::openFileForRead(
path.remove_prefix(index);
}

return std::make_unique<HdfsReadFile>(impl_->hdfsClient(), path);
return std::make_unique<HdfsReadFile>(
impl_->hdfsShim(), impl_->hdfsClient(), path);
}

std::unique_ptr<WriteFile> HdfsFileSystem::openFileForWrite(
std::string_view path,
const FileOptions& /*unused*/) {
return std::make_unique<HdfsWriteFile>(impl_->hdfsClient(), path);
return std::make_unique<HdfsWriteFile>(
impl_->hdfsShim(), impl_->hdfsClient(), path);
}

bool HdfsFileSystem::isHdfsFile(const std::string_view filePath) {
Expand Down
24 changes: 10 additions & 14 deletions velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,32 @@

#include "HdfsReadFile.h"
#include <folly/synchronization/CallOnce.h>
#include <hdfs/hdfs.h>

namespace facebook::velox {

HdfsReadFile::HdfsReadFile(hdfsFS hdfs, const std::string_view path)
: hdfsClient_(hdfs), filePath_(path) {
fileInfo_ = hdfsGetPathInfo(hdfsClient_, filePath_.data());
HdfsReadFile::HdfsReadFile(
filesystems::arrow::io::internal::LibHdfsShim* driver,
hdfsFS hdfs,
const std::string_view path)
: driver_(driver), hdfsClient_(hdfs), filePath_(path) {
fileInfo_ = driver_->GetPathInfo(hdfsClient_, filePath_.data());
if (fileInfo_ == nullptr) {
auto error = hdfsGetLastError();
auto errMsg = fmt::format(
"Unable to get file path info for file: {}. got error: {}",
filePath_,
error);
if (std::strstr(error, "FileNotFoundException") != nullptr) {
VELOX_FILE_NOT_FOUND_ERROR(errMsg);
}
auto errMsg =
fmt::format("Unable to get file path info for file: {}.", filePath_);
VELOX_FAIL(errMsg);
}
}

HdfsReadFile::~HdfsReadFile() {
// should call hdfsFreeFileInfo to avoid memory leak
hdfsFreeFileInfo(fileInfo_, 1);
driver_->FreeFileInfo(fileInfo_, 1);
}

void HdfsReadFile::preadInternal(uint64_t offset, uint64_t length, char* pos)
const {
checkFileReadParameters(offset, length);
if (!file_->handle_) {
file_->open(hdfsClient_, filePath_);
file_->open(driver_, hdfsClient_, filePath_);
}
file_->seek(offset);
uint64_t totalBytesRead = 0;
Expand Down
35 changes: 20 additions & 15 deletions velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,42 +14,43 @@
* limitations under the License.
*/

#include <hdfs/hdfs.h>
#include "velox/common/file/File.h"
#include "velox/connectors/hive/storage_adapters/hdfs/hdfs.h"
#include "velox/connectors/hive/storage_adapters/hdfs/hdfs_internal.h"

namespace facebook::velox {

struct HdfsFile {
filesystems::arrow::io::internal::LibHdfsShim* driver_;
hdfsFS client_;
hdfsFile handle_;

HdfsFile() : client_(nullptr), handle_(nullptr) {}
HdfsFile() : driver_(nullptr), client_(nullptr), handle_(nullptr) {}
~HdfsFile() {
if (handle_ && hdfsCloseFile(client_, handle_) == -1) {
if (handle_ && driver_->CloseFile(client_, handle_) == -1) {
LOG(ERROR) << "Unable to close file, errno: " << errno;
}
}

void open(hdfsFS client, const std::string& path) {
void open(
filesystems::arrow::io::internal::LibHdfsShim* driver,
hdfsFS client,
const std::string& path) {
driver_ = driver;
client_ = client;
handle_ = hdfsOpenFile(client, path.data(), O_RDONLY, 0, 0, 0);
VELOX_CHECK_NOT_NULL(
handle_,
"Unable to open file {}. got error: {}",
path,
hdfsGetLastError());
handle_ = driver->OpenFile(client, path.data(), O_RDONLY, 0, 0, 0);
VELOX_CHECK_NOT_NULL(handle_, "Unable to open file {}.", path);
}

void seek(uint64_t offset) const {
VELOX_CHECK_EQ(
hdfsSeek(client_, handle_, offset),
driver_->Seek(client_, handle_, offset),
0,
"Cannot seek through HDFS file, error is : {}",
std::string(hdfsGetLastError()));
"Cannot seek through HDFS file.");
}

int32_t read(char* pos, uint64_t length) const {
auto bytesRead = hdfsRead(client_, handle_, pos, length);
auto bytesRead = driver_->Read(client_, handle_, pos, length);
VELOX_CHECK(bytesRead >= 0, "Read failure in HDFSReadFile::preadInternal.")
return bytesRead;
}
Expand All @@ -60,7 +61,10 @@ struct HdfsFile {
*/
class HdfsReadFile final : public ReadFile {
public:
explicit HdfsReadFile(hdfsFS hdfs, std::string_view path);
explicit HdfsReadFile(
filesystems::arrow::io::internal::LibHdfsShim* driver,
hdfsFS hdfs,
std::string_view path);
~HdfsReadFile() override;

std::string_view pread(uint64_t offset, uint64_t length, void* buf)
Expand All @@ -86,6 +90,7 @@ class HdfsReadFile final : public ReadFile {
void preadInternal(uint64_t offset, uint64_t length, char* pos) const;
void checkFileReadParameters(uint64_t offset, uint64_t length) const;

filesystems::arrow::io::internal::LibHdfsShim* driver_;
hdfsFS hdfsClient_;
hdfsFileInfo* fileInfo_;
std::string filePath_;
Expand Down
40 changes: 15 additions & 25 deletions velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,30 @@
*/

#include "velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.h"
#include <hdfs/hdfs.h>

namespace facebook::velox {
HdfsWriteFile::HdfsWriteFile(
filesystems::arrow::io::internal::LibHdfsShim* driver,
hdfsFS hdfsClient,
std::string_view path,
int bufferSize,
short replication,
int blockSize)
: hdfsClient_(hdfsClient), filePath_(path) {
: driver_(driver), hdfsClient_(hdfsClient), filePath_(path) {
auto pos = filePath_.rfind("/");
auto parentDir = filePath_.substr(0, pos + 1);
if (hdfsExists(hdfsClient_, parentDir.c_str()) == -1) {
hdfsCreateDirectory(hdfsClient_, parentDir.c_str());
if (driver_->Exists(hdfsClient_, parentDir.c_str()) == -1) {
driver_->MakeDirectory(hdfsClient_, parentDir.c_str());
}

hdfsFile_ = hdfsOpenFile(
hdfsFile_ = driver_->OpenFile(
hdfsClient_,
filePath_.c_str(),
O_WRONLY,
bufferSize,
replication,
blockSize);
VELOX_CHECK_NOT_NULL(
hdfsFile_,
"Failed to open hdfs file: {}, with error: {}",
filePath_,
std::string(hdfsGetLastError()));
VELOX_CHECK_NOT_NULL(hdfsFile_, "Failed to open hdfs file: {}.", filePath_);
}

HdfsWriteFile::~HdfsWriteFile() {
Expand All @@ -52,12 +48,8 @@ HdfsWriteFile::~HdfsWriteFile() {
}

void HdfsWriteFile::close() {
int success = hdfsCloseFile(hdfsClient_, hdfsFile_);
VELOX_CHECK_EQ(
success,
0,
"Failed to close hdfs file: {}",
std::string(hdfsGetLastError()));
int success = driver_->CloseFile(hdfsClient_, hdfsFile_);
VELOX_CHECK_EQ(success, 0, "Failed to close hdfs file.");
hdfsFile_ = nullptr;
}

Expand All @@ -66,9 +58,8 @@ void HdfsWriteFile::flush() {
hdfsFile_,
"Cannot flush HDFS file because file handle is null, file path: {}",
filePath_);
int success = hdfsFlush(hdfsClient_, hdfsFile_);
VELOX_CHECK_EQ(
success, 0, "Hdfs flush error: {}", std::string(hdfsGetLastError()));
int success = driver_->Flush(hdfsClient_, hdfsFile_);
VELOX_CHECK_EQ(success, 0, "Hdfs flush error.");
}

void HdfsWriteFile::append(std::string_view data) {
Expand All @@ -79,20 +70,19 @@ void HdfsWriteFile::append(std::string_view data) {
hdfsFile_,
"Cannot append to HDFS file because file handle is null, file path: {}",
filePath_);
int64_t totalWrittenBytes =
hdfsWrite(hdfsClient_, hdfsFile_, std::string(data).c_str(), data.size());
int64_t totalWrittenBytes = driver_->Write(
hdfsClient_, hdfsFile_, std::string(data).c_str(), data.size());
VELOX_CHECK_EQ(
totalWrittenBytes,
data.size(),
"Write failure in HDFSWriteFile::append {}",
std::string(hdfsGetLastError()));
"Write failure in HDFSWriteFile::append.");
}

uint64_t HdfsWriteFile::size() const {
auto fileInfo = hdfsGetPathInfo(hdfsClient_, filePath_.c_str());
auto fileInfo = driver_->GetPathInfo(hdfsClient_, filePath_.c_str());
uint64_t size = fileInfo->mSize;
// should call hdfsFreeFileInfo to avoid memory leak
hdfsFreeFileInfo(fileInfo, 1);
driver_->FreeFileInfo(fileInfo, 1);
return size;
}

Expand Down
5 changes: 4 additions & 1 deletion velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
*/
#pragma once

#include <hdfs/hdfs.h>
#include "velox/common/file/File.h"
#include "velox/connectors/hive/storage_adapters/hdfs/hdfs.h"
#include "velox/connectors/hive/storage_adapters/hdfs/hdfs_internal.h"

namespace facebook::velox {

Expand All @@ -34,6 +35,7 @@ class HdfsWriteFile : public WriteFile {
/// @param blockSize Size of block - pass 0 if you want to use the
/// default configured values.
HdfsWriteFile(
filesystems::arrow::io::internal::LibHdfsShim* driver,
hdfsFS hdfsClient,
std::string_view path,
int bufferSize = 0,
Expand All @@ -55,6 +57,7 @@ class HdfsWriteFile : public WriteFile {
void close() override;

private:
filesystems::arrow::io::internal::LibHdfsShim* driver_;
/// The configured hdfs filesystem handle.
hdfsFS hdfsClient_;
/// The hdfs file handle for write.
Expand Down
Loading

0 comments on commit bc84e4f

Please sign in to comment.