Skip to content

Commit

Permalink
[Feature] Support read data with format of parquet from hdfs, using l…
Browse files Browse the repository at this point in the history
…ibhdfs3 (apache#5686)

Add new lib, Backend can read data from hdfs without broker,
this patch include libhdfs3.a which can read file on hdfs.
This patch will make reading the data from hdfs with parquet possible.
By this, we will support more format of file on hdfs in the future,
and we will support other metadata in the future.
  • Loading branch information
pengxiangyu authored Apr 24, 2021
1 parent a1fa392 commit 29a3fa1
Show file tree
Hide file tree
Showing 11 changed files with 508 additions and 1 deletion.
16 changes: 16 additions & 0 deletions be/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,18 @@ set_target_properties(aws-s2n PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib
add_library(minzip STATIC IMPORTED)
set_target_properties(minzip PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/libminizip.a)

add_library(hdfs3 STATIC IMPORTED)
set_target_properties(hdfs3 PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/libhdfs3.a)

add_library(gsasl STATIC IMPORTED)
set_target_properties(gsasl PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/libgsasl.a)

add_library(xml2 STATIC IMPORTED)
set_target_properties(xml2 PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/libxml2.a)

add_library(lzma STATIC IMPORTED)
set_target_properties(lzma PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/liblzma.a)

find_program(THRIFT_COMPILER thrift ${CMAKE_SOURCE_DIR}/bin)

# Check if functions are supported in this platform. All flags will generated
Expand Down Expand Up @@ -494,6 +506,10 @@ set(DORIS_DEPENDENCIES
odbc
cctz
minzip
hdfs3
gsasl
xml2
lzma
${AWS_LIBS}
${WL_END_GROUP}
)
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ set(EXEC_FILES
hash_join_node.cpp
hash_join_node_ir.cpp
hash_table.cpp
hdfs_file_reader.cpp
local_file_reader.cpp
merge_node.cpp
merge_join_node.cpp
Expand Down
15 changes: 15 additions & 0 deletions be/src/exec/broker_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@
#include "runtime/stream_load/load_stream_mgr.h"
#include "runtime/stream_load/stream_load_pipe.h"
#include "runtime/tuple.h"
#include "exprs/expr.h"
#include "exec/text_converter.h"
#include "exec/text_converter.hpp"
#include "exec/plain_text_line_reader.h"
#include "exec/hdfs_file_reader.h"
#include "exec/local_file_reader.h"
#include "exec/broker_reader.h"
#include "exec/decompressor.h"
#include "util/utf8_check.h"

namespace doris {
Expand Down Expand Up @@ -159,6 +167,13 @@ Status BrokerScanner::open_file_reader() {
_cur_file_reader = file_reader;
break;
}
case TFileType::FILE_HDFS: {
HdfsFileReader* file_reader = new HdfsFileReader(
range.hdfs_params, range.path, start_offset);
RETURN_IF_ERROR(file_reader->open());
_cur_file_reader = file_reader;
break;
}
case TFileType::FILE_BROKER: {
BrokerReader* broker_reader =
new BrokerReader(_state->exec_env(), _broker_addresses, _params.properties,
Expand Down
195 changes: 195 additions & 0 deletions be/src/exec/hdfs_file_reader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/hdfs_file_reader.h"

#include <sys/stat.h>
#include <unistd.h>

#include "common/logging.h"

namespace doris {
HdfsFileReader::HdfsFileReader(THdfsParams hdfs_params,
const std::string& path, int64_t start_offset)
: _hdfs_params(hdfs_params), _path(path), _current_offset(start_offset),
_file_size(-1), _hdfs_fs(nullptr), _hdfs_file(nullptr) {
std::stringstream namenode_ss;
namenode_ss << "hdfs://" << _hdfs_params.host<< ":" << _hdfs_params.port;
_namenode = namenode_ss.str();
}

HdfsFileReader::~HdfsFileReader() {
close();
}

Status HdfsFileReader::connect() {
hdfsBuilder* hdfs_builder = hdfsNewBuilder();
hdfsBuilderSetNameNode(hdfs_builder, _namenode.c_str());
// set hdfs user
if (_hdfs_params.__isset.user) {
hdfsBuilderSetUserName(hdfs_builder, _hdfs_params.user.c_str());
}
// set kerberos conf
if (_hdfs_params.__isset.kerb_principal) {
hdfsBuilderSetPrincipal(hdfs_builder, _hdfs_params.kerb_principal.c_str());
}
if (_hdfs_params.__isset.kerb_ticket_cache_path) {
hdfsBuilderSetKerbTicketCachePath(hdfs_builder, _hdfs_params.kerb_ticket_cache_path.c_str());
}
// set token
if (_hdfs_params.__isset.token) {
hdfsBuilderSetToken(hdfs_builder, _hdfs_params.token.c_str());
}
// set other conf
if (_hdfs_params.__isset.hdfs_conf) {
for (const THdfsConf& conf : _hdfs_params.hdfs_conf) {
hdfsBuilderConfSetStr(hdfs_builder, conf.key.c_str(), conf.value.c_str());
}
}
_hdfs_fs = hdfsBuilderConnect(hdfs_builder);
if (_hdfs_fs == nullptr) {
std::stringstream ss;
ss << "connect failed. " << _namenode;
return Status::InternalError(ss.str());
}
return Status::OK();
}

Status HdfsFileReader::open() {
if (!closed()) {
close();
}
RETURN_IF_ERROR(connect());
_hdfs_file = hdfsOpenFile(_hdfs_fs, _path.c_str(), O_RDONLY, 0, 0, 0);
if (_hdfs_file == nullptr) {
std::stringstream ss;
ss << "open file failed. " << _namenode << _path;
return Status::InternalError(ss.str());
}
LOG(INFO) << "open file. " << _namenode << _path;
return seek(_current_offset);
}

void HdfsFileReader::close() {
if (!closed()) {
if (_hdfs_file != nullptr && _hdfs_fs != nullptr) {
std::stringstream ss;
ss << "close hdfs file: " << _namenode << _path;
LOG(INFO) << ss.str();
//If the hdfs file was valid, the memory associated with it will
// be freed at the end of this call, even if there was an I/O error
hdfsCloseFile(_hdfs_fs, _hdfs_file);
}
if (_hdfs_fs != nullptr) {
// Even if there is an error, the resources associated with the hdfsFS will be freed.
hdfsDisconnect(_hdfs_fs);
}
}
_hdfs_file = nullptr;
_hdfs_fs = nullptr;
}

bool HdfsFileReader::closed() {
return _hdfs_file == nullptr || _hdfs_fs == nullptr;
}

// Read all bytes
Status HdfsFileReader::read_one_message(std::unique_ptr<uint8_t[]>* buf, size_t* length) {
int64_t file_size = size() - _current_offset;
if (file_size <= 0) {
buf->reset();
*length = 0;
return Status::OK();
}
bool eof;
*length = file_size;
buf->reset(new uint8_t[file_size]);
read(buf->get(), length, &eof);
return Status::OK();
}

Status HdfsFileReader::read(uint8_t* buf, size_t* buf_len, bool* eof) {
readat(_current_offset, (int64_t)*buf_len, (int64_t*)buf_len, buf);
if (*buf_len == 0) {
*eof = true;
} else {
*eof = false;
}
return Status::OK();
}

Status HdfsFileReader::readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) {
if (position != _current_offset) {
int ret = hdfsSeek(_hdfs_fs, _hdfs_file, position);
if (ret != 0) { // check fseek return value
std::stringstream ss;
ss << "hdfsSeek failed. " << _namenode << _path;
return Status::InternalError(ss.str());
}
}

*bytes_read = hdfsRead(_hdfs_fs, _hdfs_file, out, nbytes);
if (*bytes_read < 0) {
std::stringstream ss;
ss << "Read hdfs file failed. " << _namenode << _path;
return Status::InternalError(ss.str());
}
_current_offset += *bytes_read; // save offset with file
return Status::OK();
}

int64_t HdfsFileReader::size() {
if (_file_size == -1) {
bool need_init_fs = false;
if (_hdfs_fs == nullptr) {
need_init_fs = true;
if (!connect().ok()) {
return -1;
}
}
hdfsFileInfo* file_info = hdfsGetPathInfo(_hdfs_fs, _path.c_str());
if (file_info == nullptr) {
LOG(WARNING) << "get path info failed: " << _namenode << _path;
close();
return -1;
}
_file_size = file_info->mSize;
hdfsFreeFileInfo(file_info, 1);
if (need_init_fs) {
close();
}
}
return _file_size;
}

Status HdfsFileReader::seek(int64_t position) {
int res = hdfsSeek(_hdfs_fs, _hdfs_file, position);
if (res != 0) {
char err_buf[64];
std::stringstream ss;
ss << "Seek to offset failed. offset=" << position
<< ", error=" << strerror_r(errno, err_buf, 64);
return Status::InternalError(ss.str());
}
return Status::OK();
}

Status HdfsFileReader::tell(int64_t* position) {
*position = _current_offset;
return Status::OK();
}

} // namespace doris
61 changes: 61 additions & 0 deletions be/src/exec/hdfs_file_reader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <hdfs/hdfs.h>

#include "exec/file_reader.h"

#include "gen_cpp/PlanNodes_types.h"

namespace doris {

class HdfsFileReader : public FileReader {
public:
HdfsFileReader(THdfsParams hdfs_params, const std::string& path, int64_t start_offset);
virtual ~HdfsFileReader();

virtual Status open() override;

// Read content to 'buf', 'buf_len' is the max size of this buffer.
// Return ok when read success, and 'buf_len' is set to size of read content
// If reach to end of file, the eof is set to true. meanwhile 'buf_len'
// is set to zero.
virtual Status read(uint8_t* buf, size_t* buf_len, bool* eof) override;
virtual Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read,
void* out) override;
virtual Status read_one_message(std::unique_ptr<uint8_t[]>* buf, size_t* length) override;
virtual int64_t size() override;
virtual Status seek(int64_t position) override;
virtual Status tell(int64_t* position) override;
virtual void close() override;
virtual bool closed() override;

private:
Status connect();
private:
THdfsParams _hdfs_params;
std::string _namenode;
std::string _path;
int64_t _current_offset;
int64_t _file_size;
hdfsFS _hdfs_fs;
hdfsFile _hdfs_file;
};

} // namespace doris
15 changes: 15 additions & 0 deletions be/src/exec/parquet_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@
#include "runtime/stream_load/load_stream_mgr.h"
#include "runtime/stream_load/stream_load_pipe.h"
#include "runtime/tuple.h"
#include "exec/parquet_reader.h"
#include "exprs/expr.h"
#include "exec/text_converter.h"
#include "exec/text_converter.hpp"
#include "exec/hdfs_file_reader.h"
#include "exec/local_file_reader.h"
#include "exec/broker_reader.h"
#include "exec/buffered_reader.h"
#include "exec/decompressor.h"
#include "exec/parquet_reader.h"

namespace doris {

Expand Down Expand Up @@ -117,6 +127,11 @@ Status ParquetScanner::open_next_reader() {
file_reader.reset(new LocalFileReader(range.path, range.start_offset));
break;
}
case TFileType::FILE_HDFS: {
file_reader.reset(new HdfsFileReader(
range.hdfs_params, range.path, range.start_offset));
break;
}
case TFileType::FILE_BROKER: {
int64_t file_size = 0;
// for compatibility
Expand Down
16 changes: 16 additions & 0 deletions gensrc/thrift/PlanNodes.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,21 @@ enum TFileFormatType {
FORMAT_JSON,
}

struct THdfsConf {
1: required string key
2: required string value
}

struct THdfsParams {
1: optional string host
2: optional i32 port
3: optional string user
4: optional string kerb_principal
5: optional string kerb_ticket_cache_path
6: optional string token
7: optional list<THdfsConf> hdfs_conf
}

// One broker range information.
struct TBrokerRangeDesc {
1: required Types.TFileType file_type
Expand All @@ -134,6 +149,7 @@ struct TBrokerRangeDesc {
// it's usefull when format_type == FORMAT_JSON
14: optional bool num_as_string;
15: optional bool fuzzy_parse;
16: optional THdfsParams hdfs_params
}

struct TBrokerScanRangeParams {
Expand Down
Loading

0 comments on commit 29a3fa1

Please sign in to comment.