Skip to content

Commit

Permalink
[Feature]Support reading hudi MOR table in snapshot mode (StarRocks#6780
Browse files Browse the repository at this point in the history
)
  • Loading branch information
miomiocat authored Aug 22, 2022
1 parent 7f6658e commit 4f4b473
Show file tree
Hide file tree
Showing 27 changed files with 934 additions and 97 deletions.
46 changes: 45 additions & 1 deletion be/src/connector/hive_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "exec/vectorized/hdfs_scanner_orc.h"
#include "exec/vectorized/hdfs_scanner_parquet.h"
#include "exec/vectorized/hdfs_scanner_text.h"
#include "exec/vectorized/jni_scanner.h"
#include "exprs/expr.h"
#include "storage/chunk_helper.h"

Expand Down Expand Up @@ -258,7 +259,50 @@ Status HiveDataSource::_init_scanner(RuntimeState* state) {

HdfsScanner* scanner = nullptr;
auto format = scan_range.file_format;
if (format == THdfsFileFormat::PARQUET) {
if (dynamic_cast<const HudiTableDescriptor*>(_tuple_desc->table_desc()) && scan_range.hudi_mor_table) {
const auto* hudi_table = dynamic_cast<const HudiTableDescriptor*>(_hive_table);
auto* partition_desc = hudi_table->get_partition(scan_range.partition_id);
std::string partition_full_path = partition_desc->location();

std::string required_fields;
for (auto slot : _tuple_desc->slots()) {
required_fields.append(slot->col_name());
required_fields.append(",");
}
required_fields = required_fields.substr(0, required_fields.size() - 1);

std::string delta_file_paths;
if (!scan_range.hudi_logs.empty()) {
for (const std::string& log : scan_range.hudi_logs) {
delta_file_paths.append(partition_full_path.append("/").append(log));
delta_file_paths.append(",");
}
delta_file_paths = delta_file_paths.substr(0, delta_file_paths.size() - 1);
}

std::string data_file_path;
if (scan_range.relative_path.empty()) {
data_file_path = "";
} else {
data_file_path = partition_full_path.append("/").append(scan_range.relative_path);
}

std::map<std::string, std::string> jni_scanner_params;
jni_scanner_params["base_path"] = hudi_table->get_base_path();
jni_scanner_params["hive_column_names"] = hudi_table->get_hive_column_names();
jni_scanner_params["hive_column_types"] = hudi_table->get_hive_column_types();
jni_scanner_params["required_fields"] = required_fields;
jni_scanner_params["instant_time"] = hudi_table->get_instant_time();
jni_scanner_params["delta_file_paths"] = delta_file_paths;
jni_scanner_params["data_file_path"] = data_file_path;
jni_scanner_params["data_file_length"] = std::to_string(scan_range.file_length);
jni_scanner_params["serde"] = hudi_table->get_serde_lib();
jni_scanner_params["input_format"] = hudi_table->get_input_format();

std::string scanner_factory_class = "com/starrocks/hudi/reader/HudiSliceScannerFactory";

scanner = _pool.add(new JniScanner(scanner_factory_class, jni_scanner_params));
} else if (format == THdfsFileFormat::PARQUET) {
scanner = _pool.add(new HdfsParquetScanner());
} else if (format == THdfsFileFormat::ORC) {
scanner = _pool.add(new HdfsOrcScanner());
Expand Down
1 change: 1 addition & 0 deletions be/src/connector/hive_connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "column/vectorized_fwd.h"
#include "connector/connector.h"
#include "exec/vectorized/hdfs_scanner.h"

namespace starrocks {

namespace connector {
Expand Down
12 changes: 8 additions & 4 deletions be/src/exec/vectorized/hdfs_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,6 @@ Status HdfsScanner::open(RuntimeState* runtime_state) {
if (_opened) {
return Status::OK();
}
CHECK(_file == nullptr) << "File has already been opened";
ASSIGN_OR_RETURN(_raw_file, _scanner_params.fs->new_random_access_file(_scanner_params.path));
_file = std::make_unique<RandomAccessFile>(
std::make_shared<CountedSeekableInputStream>(_raw_file->stream(), &_stats), _raw_file->filename());
_build_scanner_context();
auto status = do_open(runtime_state);
if (status.ok()) {
Expand Down Expand Up @@ -180,6 +176,14 @@ uint64_t HdfsScanner::exit_pending_queue() {
return _pending_queue_sw.reset();
}

Status HdfsScanner::open_random_access_file() {
CHECK(_file == nullptr) << "File has already been opened";
ASSIGN_OR_RETURN(_raw_file, _scanner_params.fs->new_random_access_file(_scanner_params.path))
_file = std::make_unique<RandomAccessFile>(
std::make_shared<CountedSeekableInputStream>(_raw_file->stream(), &_stats), _raw_file->filename());
return Status::OK();
}

void HdfsScanner::update_hdfs_counter(HdfsScanProfile* profile) {
static const char* const kHdfsIOProfileSectionPrefix = "HdfsIO";
if (_file == nullptr) return;
Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/vectorized/hdfs_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,9 @@ class HdfsScanner {
// how long it stays inside pending queue.
uint64_t exit_pending_queue();

protected:
Status open_random_access_file();

private:
bool _opened = false;
std::atomic<bool> _closed = false;
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/vectorized/hdfs_scanner_orc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ bool OrcRowReaderFilter::filterOnPickStringDictionary(
}

Status HdfsOrcScanner::do_open(RuntimeState* runtime_state) {
RETURN_IF_ERROR(open_random_access_file());
auto input_stream = std::make_unique<ORCHdfsFileStream>(_file.get(), _scanner_params.scan_ranges[0]->file_length);
SCOPED_RAW_TIMER(&_stats.reader_init_ns);
std::unique_ptr<orc::Reader> reader;
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/vectorized/hdfs_scanner_parquet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ void HdfsParquetScanner::do_update_counter(HdfsScanProfile* profile) {
}

Status HdfsParquetScanner::do_open(RuntimeState* runtime_state) {
RETURN_IF_ERROR(open_random_access_file());
// create file reader
_reader = std::make_shared<parquet::FileReader>(runtime_state->chunk_size(), _file.get(),
_scanner_params.scan_ranges[0]->file_length);
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/vectorized/hdfs_scanner_text.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ Status HdfsTextScanner::do_init(RuntimeState* runtime_state, const HdfsScannerPa
}

Status HdfsTextScanner::do_open(RuntimeState* runtime_state) {
RETURN_IF_ERROR(open_random_access_file());
RETURN_IF_ERROR(_create_or_reinit_reader());
SCOPED_RAW_TIMER(&_stats.reader_init_ns);
for (const auto slot : _scanner_params.materialize_slots) {
Expand Down
29 changes: 29 additions & 0 deletions be/src/runtime/descriptors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,35 @@ HudiTableDescriptor::HudiTableDescriptor(const TTableDescriptor& tdesc, ObjectPo
auto* partition = pool->add(new HdfsPartitionDescriptor(tdesc.hudiTable, entry.second));
_partition_id_to_desc_map[entry.first] = partition;
}
_hudi_instant_time = tdesc.hudiTable.instant_time;
_hive_column_names = tdesc.hudiTable.hive_column_names;
_hive_column_types = tdesc.hudiTable.hive_column_types;
_input_format = tdesc.hudiTable.input_format;
_serde_lib = tdesc.hudiTable.serde_lib;
}

const std::string& HudiTableDescriptor::get_base_path() const {
return _table_location;
}

const std::string& HudiTableDescriptor::get_instant_time() const {
return _hudi_instant_time;
}

const std::string& HudiTableDescriptor::get_hive_column_names() const {
return _hive_column_names;
}

const std::string& HudiTableDescriptor::get_hive_column_types() const {
return _hive_column_types;
}

const std::string& HudiTableDescriptor::get_input_format() const {
return _input_format;
}

const std::string& HudiTableDescriptor::get_serde_lib() const {
return _serde_lib;
}

HiveTableDescriptor::HiveTableDescriptor(const TTableDescriptor& tdesc, ObjectPool* pool) : TableDescriptor(tdesc) {}
Expand Down
13 changes: 13 additions & 0 deletions be/src/runtime/descriptors.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,19 @@ class HudiTableDescriptor : public HiveTableDescriptor {
HudiTableDescriptor(const TTableDescriptor& tdesc, ObjectPool* pool);
~HudiTableDescriptor() override = default;
bool has_partition() const override { return true; }
const std::string& get_base_path() const;
const std::string& get_instant_time() const;
const std::string& get_hive_column_names() const;
const std::string& get_hive_column_types() const;
const std::string& get_input_format() const;
const std::string& get_serde_lib() const;

private:
std::string _hudi_instant_time;
std::string _hive_column_names;
std::string _hive_column_types;
std::string _input_format;
std::string _serde_lib;
};

// ===========================================
Expand Down
6 changes: 5 additions & 1 deletion build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,11 @@ if [ ${BUILD_BE} -eq 1 ]; then
cp -r -p ${STARROCKS_HOME}/be/output/udf/include/* ${STARROCKS_OUTPUT}/udf/include/
cp -r -p ${STARROCKS_HOME}/java-extensions/jdbc-bridge/target/starrocks-jdbc-bridge-jar-with-dependencies.jar ${STARROCKS_OUTPUT}/be/lib/jni-packages
cp -r -p ${STARROCKS_HOME}/java-extensions/udf-extensions/target/udf-extensions-jar-with-dependencies.jar ${STARROCKS_OUTPUT}/be/lib/jni-packages
cp -r -p ${STARROCKS_HOME}/java-extensions/java-utils/target/starrocks-java-utils-jar-with-dependencies.jar ${STARROCKS_OUTPUT}/be/lib/jni-packages
cp -r -p ${STARROCKS_HOME}/java-extensions/java-utils/target/starrocks-java-utils.jar ${STARROCKS_OUTPUT}/be/lib/jni-packages
cp -r -p ${STARROCKS_HOME}/java-extensions/jni-connector/target/starrocks-jni-connector.jar ${STARROCKS_OUTPUT}/be/lib/jni-packages
cp -r -p ${STARROCKS_HOME}/java-extensions/hudi-reader/target/hudi-reader-lib ${STARROCKS_OUTPUT}/be/lib/
cp -r -p ${STARROCKS_HOME}/java-extensions/hudi-reader/target/starrocks-hudi-reader.jar ${STARROCKS_OUTPUT}/be/lib/jni-packages
cp -r -p ${STARROCKS_HOME}/java-extensions/hudi-reader/target/starrocks-hudi-reader.jar ${STARROCKS_OUTPUT}/be/lib/hudi-reader-lib
cp -r -p ${STARROCKS_THIRDPARTY}/installed/hadoop/share/hadoop/common ${STARROCKS_OUTPUT}/be/lib/hadoop/
cp -r -p ${STARROCKS_THIRDPARTY}/installed/hadoop/share/hadoop/hdfs ${STARROCKS_OUTPUT}/be/lib/hadoop/
cp -r -p ${STARROCKS_THIRDPARTY}/installed/hadoop/lib/native ${STARROCKS_OUTPUT}/be/lib/hadoop/
Expand Down
96 changes: 83 additions & 13 deletions fe/fe-core/src/main/java/com/starrocks/catalog/HudiTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -47,11 +49,15 @@
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import static com.starrocks.external.HiveMetaStoreTableUtils.isInternalCatalog;
import static org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils.getTypeInfoFromTypeString;
import static org.apache.hudi.common.table.HoodieTableConfig.CREATE_SCHEMA;

/**
* Currently, we depend on Hive metastore to obtain table/partition path and statistics.
Expand All @@ -74,6 +80,11 @@ public class HudiTable extends Table implements HiveMetaStoreTable {
public static final String HUDI_TABLE_PRE_COMBINE_FIELD = "hudi.table.preCombineField";
public static final String HUDI_BASE_PATH = "hudi.table.base.path";
public static final String HUDI_TABLE_BASE_FILE_FORMAT = "hudi.table.base.file.format";
public static final String HUDI_TABLE_AVRO_RECORD_SCHEMA = "hudi.table.avro.record.schema";
public static final String HUDI_TABLE_SERDE_LIB = "hudi.table.serde.lib";
public static final String HUDI_TABLE_INPUT_FOAMT = "hudi.table.input.format";
public static final String HUDI_TABLE_COLUMN_NAMES = "hudi.table.column.names";
public static final String HUDI_TABLE_COLUMN_TYPES = "hudi.table.column.types";
public static final String HUDI_DB = "database";
public static final String HUDI_TABLE = "table";
public static final String HUDI_RESOURCE = "resource";
Expand Down Expand Up @@ -125,6 +136,10 @@ public String getHudiBasePath() {
return hudiProperties.get(HUDI_BASE_PATH);
}

public String getTableCreateSchema() {
return hudiProperties.get(HUDI_TABLE_AVRO_RECORD_SCHEMA);
}

@Override
public String getTableName() {
return table;
Expand Down Expand Up @@ -254,33 +269,52 @@ private void initProps(Map<String, String> properties) throws DdlException {
throw new DdlException("Resource [" + resourceName + "] is not hudi resource");
}
}

this.resourceName = resourceName;

org.apache.hadoop.hive.metastore.api.Table metastoreTable =
HudiTable.validate(resourceName, this.db, this.table, this.fullSchema);
GlobalStateMgr.getCurrentState().getHiveRepository().getTable(resourceName, this.db, this.table);
Schema tableSchema = HudiTable.loadHudiSchema(metastoreTable);

for (Column column : this.fullSchema) {
if (!column.isAllowNull()) {
throw new DdlException(
"Hudi extern table does not support no-nullable column: [" + column.getName() + "]");
}
Schema.Field hudiColumn = tableSchema.getField(column.getName());
if (hudiColumn == null) {
throw new DdlException("Column [" + column.getName() + "] not exists in hudi.");
}
// for each column in hudi schema, we should transfer hudi column type to starrocks type
// after that, we should check column type whether is same
// Only internal catalog like hudi external table need to validate column type
if (HiveMetaStoreTableUtils.isInternalCatalog(resourceName) &&
!validColumnType(hudiColumn.schema(), column.getType())) {
throw new DdlException("Can not convert hudi column type [" + hudiColumn.schema().toString() + "] " +
"to starrocks type [" + column.getPrimitiveType() + "], column name: " + column.getName()
+ ", starrocks type should be "
+ HiveMetaStoreTableUtils.convertHudiTableColumnType(hudiColumn.schema()).toSql());
}
}

String hudiBasePath = metastoreTable.getSd().getLocation();
if (!Strings.isNullOrEmpty(hudiBasePath)) {
hudiProperties.put(HUDI_BASE_PATH, hudiBasePath);
}
String serdeLib = metastoreTable.getSd().getSerdeInfo().getSerializationLib();
if (!Strings.isNullOrEmpty(serdeLib)) {
hudiProperties.put(HUDI_TABLE_SERDE_LIB, serdeLib);
}
String inputFormat = metastoreTable.getSd().getInputFormat();
if (!Strings.isNullOrEmpty(inputFormat)) {
hudiProperties.put(HUDI_TABLE_INPUT_FOAMT, inputFormat);
}

Configuration conf = new Configuration();
HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(conf).setBasePath(hudiBasePath).build();
HoodieTableConfig hudiTableConfig = metaClient.getTableConfig();

Schema tableSchema;
try {
tableSchema = loadHudiSchema(metastoreTable);
} catch (Exception e) {
throw new DdlException("Cannot get hudi table schema.");
}

org.apache.hudi.common.model.HoodieTableType hudiTableType = hudiTableConfig.getTableType();
if (hudiTableType == org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ) {
throw new DdlException("MERGE_ON_READ type of hudi table is NOT supported.");
}
hudiProperties.put(HUDI_TABLE_TYPE, hudiTableType.name());

Option<String[]> hudiTablePrimaryKey = hudiTableConfig.getRecordKeyFields();
Expand All @@ -296,6 +330,32 @@ private void initProps(Map<String, String> properties) throws DdlException {
HoodieFileFormat hudiBaseFileFormat = hudiTableConfig.getBaseFileFormat();
hudiProperties.put(HUDI_TABLE_BASE_FILE_FORMAT, hudiBaseFileFormat.name());

StringBuilder columnNamesBuilder = new StringBuilder();
StringBuilder columnTypesBuilder = new StringBuilder();
List<FieldSchema> allFields = metastoreTable.getSd().getCols();
allFields.addAll(metastoreTable.getPartitionKeys());

hudiProperties.put(HUDI_TABLE_AVRO_RECORD_SCHEMA, metaClient.getTableConfig().getString(CREATE_SCHEMA));

boolean isFirst = true;
for (Schema.Field hudiField : tableSchema.getFields()) {
if (!isFirst) {
columnNamesBuilder.append(",");
columnTypesBuilder.append(",");
}
Optional<FieldSchema> field = allFields.stream()
.filter(f -> f.getName().equals(hudiField.name().toLowerCase(Locale.ROOT))).findFirst();
if (!field.isPresent()) {
throw new DdlException("Hudi column [" + hudiField.name() + "] not exists in hive metastore.");
}
TypeInfo fieldInfo = getTypeInfoFromTypeString(field.get().getType());
columnNamesBuilder.append(field.get().getName());
columnTypesBuilder.append(fieldInfo.getTypeName());
isFirst = false;
}
hudiProperties.put(HUDI_TABLE_COLUMN_NAMES, columnNamesBuilder.toString());
hudiProperties.put(HUDI_TABLE_COLUMN_TYPES, columnTypesBuilder.toString());

Option<String[]> hudiPartitionFields = hudiTableConfig.getPartitionFields();
if (hudiPartitionFields.isPresent()) {
for (String partField : hudiPartitionFields.get()) {
Expand Down Expand Up @@ -323,7 +383,6 @@ private void initProps(Map<String, String> properties) throws DdlException {
this.dataColumnNames.add(hudiField.name());
}
}

if (!copiedProps.isEmpty()) {
throw new DdlException("Unknown table properties: " + copiedProps);
}
Expand Down Expand Up @@ -498,6 +557,17 @@ public TTableDescriptor toThrift(List<DescriptorTable.ReferencedPartitionInfo> p
tHudiTable.putToPartitions(partitionId, tPartition);
}

Configuration conf = new Configuration();
HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(conf).setBasePath(getHudiBasePath()).build();
HoodieTimeline timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
String queryInstant = timeline.lastInstant().get().getTimestamp();
tHudiTable.setInstant_time(queryInstant);
tHudiTable.setHive_column_names(hudiProperties.get(HUDI_TABLE_COLUMN_NAMES));
tHudiTable.setHive_column_types(hudiProperties.get(HUDI_TABLE_COLUMN_TYPES));
tHudiTable.setInput_format(hudiProperties.get(HUDI_TABLE_INPUT_FOAMT));
tHudiTable.setSerde_lib(hudiProperties.get(HUDI_TABLE_SERDE_LIB));

TTableDescriptor tTableDescriptor =
new TTableDescriptor(id, TTableType.HUDI_TABLE, fullSchema.size(), 0, table, db);
tTableDescriptor.setHudiTable(tHudiTable);
Expand Down
Loading

0 comments on commit 4f4b473

Please sign in to comment.