forked from apache/doris
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[Feature](avro) Support Apache Avro file format (apache#19990)
support read avro file by hdfs() or s3() . ```sql select * from s3( "uri" = "http://127.0.0.1:9312/test2/person.avro", "ACCESS_KEY" = "ak", "SECRET_KEY" = "sk", "FORMAT" = "avro"); +--------+--------------+-------------+-----------------+ | name | boolean_type | double_type | long_type | +--------+--------------+-------------+-----------------+ | Alyssa | 1 | 10.0012 | 100000000221133 | | Ben | 0 | 5555.999 | 4009990000 | | lisi | 0 | 5992225.999 | 9099933330 | +--------+--------------+-------------+-----------------+ select * from hdfs( "uri" = "hdfs://127.0.0.1:9000/input/person2.avro", "fs.defaultFS" = "hdfs://127.0.0.1:9000", "hadoop.username" = "doris", "format" = "avro"); +--------+--------------+-------------+-----------+ | name | boolean_type | double_type | long_type | +--------+--------------+-------------+-----------+ | Alyssa | 1 | 8888.99999 | 89898989 | +--------+--------------+-------------+-----------+ ``` current avro reader only support common data type, the complex data types will be supported later.
- Loading branch information
1 parent
2a14fca
commit e8d4841
Showing
31 changed files
with
1,286 additions
and
30 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,165 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
#include "avro_jni_reader.h" | ||
|
||
#include <map> | ||
#include <ostream> | ||
|
||
#include "runtime/descriptors.h" | ||
#include "runtime/types.h" | ||
|
||
namespace doris::vectorized { | ||
|
||
AvroJNIReader::AvroJNIReader(RuntimeState* state, RuntimeProfile* profile, | ||
const TFileScanRangeParams& params, | ||
const std::vector<SlotDescriptor*>& file_slot_descs) | ||
: _file_slot_descs(file_slot_descs), _state(state), _profile(profile), _params(params) {} | ||
|
||
AvroJNIReader::AvroJNIReader(RuntimeProfile* profile, const TFileScanRangeParams& params, | ||
const TFileRangeDesc& range, | ||
const std::vector<SlotDescriptor*>& file_slot_descs) | ||
: _file_slot_descs(file_slot_descs), _profile(profile), _params(params), _range(range) {} | ||
|
||
AvroJNIReader::~AvroJNIReader() = default; | ||
|
||
Status AvroJNIReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { | ||
RETURN_IF_ERROR(_jni_connector->get_nex_block(block, read_rows, eof)); | ||
if (*eof) { | ||
RETURN_IF_ERROR(_jni_connector->close()); | ||
} | ||
return Status::OK(); | ||
} | ||
|
||
Status AvroJNIReader::get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type, | ||
std::unordered_set<std::string>* missing_cols) { | ||
for (auto& desc : _file_slot_descs) { | ||
name_to_type->emplace(desc->col_name(), desc->type()); | ||
} | ||
return Status::OK(); | ||
} | ||
|
||
Status AvroJNIReader::init_fetch_table_reader( | ||
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) { | ||
_colname_to_value_range = colname_to_value_range; | ||
std::ostringstream required_fields; | ||
std::ostringstream columns_types; | ||
std::vector<std::string> column_names; | ||
int index = 0; | ||
for (auto& desc : _file_slot_descs) { | ||
std::string field = desc->col_name(); | ||
column_names.emplace_back(field); | ||
std::string type = JniConnector::get_hive_type(desc->type()); | ||
if (index == 0) { | ||
required_fields << field; | ||
columns_types << type; | ||
} else { | ||
required_fields << "," << field; | ||
columns_types << "#" << type; | ||
} | ||
index++; | ||
} | ||
|
||
TFileType::type type = _params.file_type; | ||
std::map<String, String> required_param = { | ||
{"required_fields", required_fields.str()}, | ||
{"columns_types", columns_types.str()}, | ||
{"file_type", std::to_string(type)}, | ||
{"is_get_table_schema", "false"}, | ||
{"hive.serde", "org.apache.hadoop.hive.serde2.avro.AvroSerDe"}}; | ||
switch (type) { | ||
case TFileType::FILE_HDFS: | ||
required_param.insert(std::make_pair("uri", _params.hdfs_params.hdfs_conf.data()->value)); | ||
break; | ||
case TFileType::FILE_S3: | ||
required_param.insert(_params.properties.begin(), _params.properties.end()); | ||
break; | ||
default: | ||
Status::InternalError("unsupported file reader type: {}", std::to_string(type)); | ||
} | ||
required_param.insert(_params.properties.begin(), _params.properties.end()); | ||
_jni_connector = std::make_unique<JniConnector>("org/apache/doris/avro/AvroJNIScanner", | ||
required_param, column_names); | ||
RETURN_IF_ERROR(_jni_connector->init(_colname_to_value_range)); | ||
return _jni_connector->open(_state, _profile); | ||
} | ||
|
||
Status AvroJNIReader::init_fetch_table_schema_reader() { | ||
std::map<String, String> required_param = {{"uri", _range.path}, | ||
{"file_type", std::to_string(_params.file_type)}, | ||
{"is_get_table_schema", "true"}}; | ||
|
||
required_param.insert(_params.properties.begin(), _params.properties.end()); | ||
_jni_connector = | ||
std::make_unique<JniConnector>("org/apache/doris/avro/AvroJNIScanner", required_param); | ||
return _jni_connector->open(nullptr, _profile); | ||
} | ||
|
||
Status AvroJNIReader::get_parsed_schema(std::vector<std::string>* col_names, | ||
std::vector<TypeDescriptor>* col_types) { | ||
std::string table_schema_str; | ||
RETURN_IF_ERROR(_jni_connector->get_table_schema(table_schema_str)); | ||
|
||
rapidjson::Document document; | ||
document.Parse(table_schema_str.c_str()); | ||
if (document.IsArray()) { | ||
for (int i = 0; i < document.Size(); ++i) { | ||
rapidjson::Value& column_schema = document[i]; | ||
col_names->push_back(column_schema["name"].GetString()); | ||
col_types->push_back(convert_to_doris_type(column_schema)); | ||
} | ||
} | ||
return _jni_connector->close(); | ||
} | ||
|
||
TypeDescriptor AvroJNIReader::convert_to_doris_type(const rapidjson::Value& column_schema) { | ||
::doris::TPrimitiveType::type schema_type = | ||
static_cast< ::doris::TPrimitiveType::type>(column_schema["type"].GetInt()); | ||
switch (schema_type) { | ||
case TPrimitiveType::INT: | ||
case TPrimitiveType::STRING: | ||
case TPrimitiveType::BIGINT: | ||
case TPrimitiveType::BOOLEAN: | ||
case TPrimitiveType::DOUBLE: | ||
case TPrimitiveType::FLOAT: | ||
return TypeDescriptor(thrift_to_type(schema_type)); | ||
case TPrimitiveType::ARRAY: { | ||
TypeDescriptor list_type(PrimitiveType::TYPE_ARRAY); | ||
list_type.add_sub_type(convert_complex_type(column_schema["childColumn"].GetObject())); | ||
return list_type; | ||
} | ||
case TPrimitiveType::MAP: { | ||
TypeDescriptor map_type(PrimitiveType::TYPE_MAP); | ||
|
||
// The default type of AVRO MAP structure key is STRING | ||
map_type.add_sub_type(PrimitiveType::TYPE_STRING); | ||
map_type.add_sub_type(convert_complex_type(column_schema["childColumn"].GetObject())); | ||
return map_type; | ||
} | ||
default: | ||
return TypeDescriptor(PrimitiveType::INVALID_TYPE); | ||
} | ||
} | ||
|
||
TypeDescriptor AvroJNIReader::convert_complex_type( | ||
const rapidjson::Document::ConstObject child_schema) { | ||
::doris::TPrimitiveType::type child_schema_type = | ||
static_cast< ::doris::TPrimitiveType::type>(child_schema["type"].GetInt()); | ||
return TypeDescriptor(thrift_to_type(child_schema_type)); | ||
} | ||
|
||
} // namespace doris::vectorized |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
#pragma once | ||
|
||
#include <rapidjson/document.h> | ||
#include <stddef.h> | ||
|
||
#include <memory> | ||
#include <string> | ||
#include <unordered_map> | ||
#include <unordered_set> | ||
#include <vector> | ||
|
||
#include "common/status.h" | ||
#include "exec/olap_common.h" | ||
#include "vec/exec/format/generic_reader.h" | ||
#include "vec/exec/jni_connector.h" | ||
|
||
namespace doris { | ||
class RuntimeProfile; | ||
|
||
class RuntimeState; | ||
|
||
class SlotDescriptor; | ||
namespace vectorized { | ||
class Block; | ||
} // namespace vectorized | ||
struct TypeDescriptor; | ||
} // namespace doris | ||
|
||
namespace doris::vectorized { | ||
|
||
/** | ||
* Read avro-format file | ||
*/ | ||
class AvroJNIReader : public GenericReader { | ||
ENABLE_FACTORY_CREATOR(AvroJNIReader); | ||
|
||
public: | ||
/** | ||
* Call java side by jni to get table data. | ||
*/ | ||
AvroJNIReader(RuntimeState* state, RuntimeProfile* profile, const TFileScanRangeParams& params, | ||
const std::vector<SlotDescriptor*>& file_slot_descs); | ||
|
||
/** | ||
* Call java side by jni to get table schema. | ||
*/ | ||
AvroJNIReader(RuntimeProfile* profile, const TFileScanRangeParams& params, | ||
const TFileRangeDesc& range, const std::vector<SlotDescriptor*>& file_slot_descs); | ||
|
||
~AvroJNIReader() override; | ||
|
||
Status get_next_block(Block* block, size_t* read_rows, bool* eof) override; | ||
|
||
Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type, | ||
std::unordered_set<std::string>* missing_cols) override; | ||
|
||
Status init_fetch_table_reader( | ||
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range); | ||
|
||
Status init_fetch_table_schema_reader(); | ||
|
||
Status get_parsed_schema(std::vector<std::string>* col_names, | ||
std::vector<TypeDescriptor>* col_types) override; | ||
|
||
TypeDescriptor convert_to_doris_type(const rapidjson::Value& column_schema); | ||
|
||
TypeDescriptor convert_complex_type(const rapidjson::Document::ConstObject child_schema); | ||
|
||
private: | ||
const std::vector<SlotDescriptor*>& _file_slot_descs; | ||
RuntimeState* _state; | ||
RuntimeProfile* _profile; | ||
const TFileScanRangeParams _params; | ||
const TFileRangeDesc _range; | ||
std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range; | ||
std::unique_ptr<JniConnector> _jni_connector; | ||
}; | ||
|
||
} // namespace doris::vectorized |
Oops, something went wrong.