diff --git a/be/src/connector/hive_connector.cpp b/be/src/connector/hive_connector.cpp index 2f42f7edc7215..fd282a49e4715 100644 --- a/be/src/connector/hive_connector.cpp +++ b/be/src/connector/hive_connector.cpp @@ -6,6 +6,7 @@ #include "exec/vectorized/hdfs_scanner_orc.h" #include "exec/vectorized/hdfs_scanner_parquet.h" #include "exec/vectorized/hdfs_scanner_text.h" +#include "exec/vectorized/jni_scanner.h" #include "exprs/expr.h" #include "storage/chunk_helper.h" @@ -258,7 +259,50 @@ Status HiveDataSource::_init_scanner(RuntimeState* state) { HdfsScanner* scanner = nullptr; auto format = scan_range.file_format; - if (format == THdfsFileFormat::PARQUET) { + if (dynamic_cast(_tuple_desc->table_desc()) && scan_range.hudi_mor_table) { + const auto* hudi_table = dynamic_cast(_hive_table); + auto* partition_desc = hudi_table->get_partition(scan_range.partition_id); + std::string partition_full_path = partition_desc->location(); + + std::string required_fields; + for (auto slot : _tuple_desc->slots()) { + required_fields.append(slot->col_name()); + required_fields.append(","); + } + required_fields = required_fields.substr(0, required_fields.size() - 1); + + std::string delta_file_paths; + if (!scan_range.hudi_logs.empty()) { + for (const std::string& log : scan_range.hudi_logs) { + delta_file_paths.append(partition_full_path.append("/").append(log)); + delta_file_paths.append(","); + } + delta_file_paths = delta_file_paths.substr(0, delta_file_paths.size() - 1); + } + + std::string data_file_path; + if (scan_range.relative_path.empty()) { + data_file_path = ""; + } else { + data_file_path = partition_full_path.append("/").append(scan_range.relative_path); + } + + std::map jni_scanner_params; + jni_scanner_params["base_path"] = hudi_table->get_base_path(); + jni_scanner_params["hive_column_names"] = hudi_table->get_hive_column_names(); + jni_scanner_params["hive_column_types"] = hudi_table->get_hive_column_types(); + jni_scanner_params["required_fields"] = required_fields; + jni_scanner_params["instant_time"] = hudi_table->get_instant_time(); + jni_scanner_params["delta_file_paths"] = delta_file_paths; + jni_scanner_params["data_file_path"] = data_file_path; + jni_scanner_params["data_file_length"] = std::to_string(scan_range.file_length); + jni_scanner_params["serde"] = hudi_table->get_serde_lib(); + jni_scanner_params["input_format"] = hudi_table->get_input_format(); + + std::string scanner_factory_class = "com/starrocks/hudi/reader/HudiSliceScannerFactory"; + + scanner = _pool.add(new JniScanner(scanner_factory_class, jni_scanner_params)); + } else if (format == THdfsFileFormat::PARQUET) { scanner = _pool.add(new HdfsParquetScanner()); } else if (format == THdfsFileFormat::ORC) { scanner = _pool.add(new HdfsOrcScanner()); diff --git a/be/src/connector/hive_connector.h b/be/src/connector/hive_connector.h index 30369ebe86432..aec9998b99972 100644 --- a/be/src/connector/hive_connector.h +++ b/be/src/connector/hive_connector.h @@ -5,6 +5,7 @@ #include "column/vectorized_fwd.h" #include "connector/connector.h" #include "exec/vectorized/hdfs_scanner.h" + namespace starrocks { namespace connector { diff --git a/be/src/exec/vectorized/hdfs_scanner.cpp b/be/src/exec/vectorized/hdfs_scanner.cpp index e8c2cb10bc230..45fb44f55f94d 100644 --- a/be/src/exec/vectorized/hdfs_scanner.cpp +++ b/be/src/exec/vectorized/hdfs_scanner.cpp @@ -137,10 +137,6 @@ Status HdfsScanner::open(RuntimeState* runtime_state) { if (_opened) { return Status::OK(); } - CHECK(_file == nullptr) << "File has already been opened"; - ASSIGN_OR_RETURN(_raw_file, _scanner_params.fs->new_random_access_file(_scanner_params.path)); - _file = std::make_unique( - std::make_shared(_raw_file->stream(), &_stats), _raw_file->filename()); _build_scanner_context(); auto status = do_open(runtime_state); if (status.ok()) { @@ -180,6 +176,14 @@ uint64_t HdfsScanner::exit_pending_queue() { return _pending_queue_sw.reset(); } +Status HdfsScanner::open_random_access_file() { + CHECK(_file == nullptr) << "File has already been opened"; + ASSIGN_OR_RETURN(_raw_file, _scanner_params.fs->new_random_access_file(_scanner_params.path)) + _file = std::make_unique( + std::make_shared(_raw_file->stream(), &_stats), _raw_file->filename()); + return Status::OK(); +} + void HdfsScanner::update_hdfs_counter(HdfsScanProfile* profile) { static const char* const kHdfsIOProfileSectionPrefix = "HdfsIO"; if (_file == nullptr) return; diff --git a/be/src/exec/vectorized/hdfs_scanner.h b/be/src/exec/vectorized/hdfs_scanner.h index 7d83566883658..6054236385d92 100644 --- a/be/src/exec/vectorized/hdfs_scanner.h +++ b/be/src/exec/vectorized/hdfs_scanner.h @@ -244,6 +244,9 @@ class HdfsScanner { // how long it stays inside pending queue. uint64_t exit_pending_queue(); +protected: + Status open_random_access_file(); + private: bool _opened = false; std::atomic _closed = false; diff --git a/be/src/exec/vectorized/hdfs_scanner_orc.cpp b/be/src/exec/vectorized/hdfs_scanner_orc.cpp index 6efbc228c98b1..03123e2884db7 100644 --- a/be/src/exec/vectorized/hdfs_scanner_orc.cpp +++ b/be/src/exec/vectorized/hdfs_scanner_orc.cpp @@ -411,6 +411,7 @@ bool OrcRowReaderFilter::filterOnPickStringDictionary( } Status HdfsOrcScanner::do_open(RuntimeState* runtime_state) { + RETURN_IF_ERROR(open_random_access_file()); auto input_stream = std::make_unique(_file.get(), _scanner_params.scan_ranges[0]->file_length); SCOPED_RAW_TIMER(&_stats.reader_init_ns); std::unique_ptr reader; diff --git a/be/src/exec/vectorized/hdfs_scanner_parquet.cpp b/be/src/exec/vectorized/hdfs_scanner_parquet.cpp index a82a8a29c2afb..4e1dd01522e82 100644 --- a/be/src/exec/vectorized/hdfs_scanner_parquet.cpp +++ b/be/src/exec/vectorized/hdfs_scanner_parquet.cpp @@ -51,6 +51,7 @@ void HdfsParquetScanner::do_update_counter(HdfsScanProfile* profile) { } Status HdfsParquetScanner::do_open(RuntimeState* runtime_state) { + RETURN_IF_ERROR(open_random_access_file()); // create file reader _reader = std::make_shared(runtime_state->chunk_size(), _file.get(), _scanner_params.scan_ranges[0]->file_length); diff --git a/be/src/exec/vectorized/hdfs_scanner_text.cpp b/be/src/exec/vectorized/hdfs_scanner_text.cpp index f6afa7d55086c..353f845d0d2e4 100644 --- a/be/src/exec/vectorized/hdfs_scanner_text.cpp +++ b/be/src/exec/vectorized/hdfs_scanner_text.cpp @@ -130,6 +130,7 @@ Status HdfsTextScanner::do_init(RuntimeState* runtime_state, const HdfsScannerPa } Status HdfsTextScanner::do_open(RuntimeState* runtime_state) { + RETURN_IF_ERROR(open_random_access_file()); RETURN_IF_ERROR(_create_or_reinit_reader()); SCOPED_RAW_TIMER(&_stats.reader_init_ns); for (const auto slot : _scanner_params.materialize_slots) { diff --git a/be/src/runtime/descriptors.cpp b/be/src/runtime/descriptors.cpp index c1b1b333356c8..e07f09d19a708 100644 --- a/be/src/runtime/descriptors.cpp +++ b/be/src/runtime/descriptors.cpp @@ -148,6 +148,35 @@ HudiTableDescriptor::HudiTableDescriptor(const TTableDescriptor& tdesc, ObjectPo auto* partition = pool->add(new HdfsPartitionDescriptor(tdesc.hudiTable, entry.second)); _partition_id_to_desc_map[entry.first] = partition; } + _hudi_instant_time = tdesc.hudiTable.instant_time; + _hive_column_names = tdesc.hudiTable.hive_column_names; + _hive_column_types = tdesc.hudiTable.hive_column_types; + _input_format = tdesc.hudiTable.input_format; + _serde_lib = tdesc.hudiTable.serde_lib; +} + +const std::string& HudiTableDescriptor::get_base_path() const { + return _table_location; +} + +const std::string& HudiTableDescriptor::get_instant_time() const { + return _hudi_instant_time; +} + +const std::string& HudiTableDescriptor::get_hive_column_names() const { + return _hive_column_names; +} + +const std::string& HudiTableDescriptor::get_hive_column_types() const { + return _hive_column_types; +} + +const std::string& HudiTableDescriptor::get_input_format() const { + return _input_format; +} + +const std::string& HudiTableDescriptor::get_serde_lib() const { + return _serde_lib; } HiveTableDescriptor::HiveTableDescriptor(const TTableDescriptor& tdesc, ObjectPool* pool) : TableDescriptor(tdesc) {} diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h index f8aee18ca88f2..93f8188470032 100644 --- a/be/src/runtime/descriptors.h +++ b/be/src/runtime/descriptors.h @@ -230,6 +230,19 @@ class HudiTableDescriptor : public HiveTableDescriptor { HudiTableDescriptor(const TTableDescriptor& tdesc, ObjectPool* pool); ~HudiTableDescriptor() override = default; bool has_partition() const override { return true; } + const std::string& get_base_path() const; + const std::string& get_instant_time() const; + const std::string& get_hive_column_names() const; + const std::string& get_hive_column_types() const; + const std::string& get_input_format() const; + const std::string& get_serde_lib() const; + +private: + std::string _hudi_instant_time; + std::string _hive_column_names; + std::string _hive_column_types; + std::string _input_format; + std::string _serde_lib; }; // =========================================== diff --git a/build.sh b/build.sh index f6779086f7a18..24915588bf5f7 100755 --- a/build.sh +++ b/build.sh @@ -328,7 +328,11 @@ if [ ${BUILD_BE} -eq 1 ]; then cp -r -p ${STARROCKS_HOME}/be/output/udf/include/* ${STARROCKS_OUTPUT}/udf/include/ cp -r -p ${STARROCKS_HOME}/java-extensions/jdbc-bridge/target/starrocks-jdbc-bridge-jar-with-dependencies.jar ${STARROCKS_OUTPUT}/be/lib/jni-packages cp -r -p ${STARROCKS_HOME}/java-extensions/udf-extensions/target/udf-extensions-jar-with-dependencies.jar ${STARROCKS_OUTPUT}/be/lib/jni-packages - cp -r -p ${STARROCKS_HOME}/java-extensions/java-utils/target/starrocks-java-utils-jar-with-dependencies.jar ${STARROCKS_OUTPUT}/be/lib/jni-packages + cp -r -p ${STARROCKS_HOME}/java-extensions/java-utils/target/starrocks-java-utils.jar ${STARROCKS_OUTPUT}/be/lib/jni-packages + cp -r -p ${STARROCKS_HOME}/java-extensions/jni-connector/target/starrocks-jni-connector.jar ${STARROCKS_OUTPUT}/be/lib/jni-packages + cp -r -p ${STARROCKS_HOME}/java-extensions/hudi-reader/target/hudi-reader-lib ${STARROCKS_OUTPUT}/be/lib/ + cp -r -p ${STARROCKS_HOME}/java-extensions/hudi-reader/target/starrocks-hudi-reader.jar ${STARROCKS_OUTPUT}/be/lib/jni-packages + cp -r -p ${STARROCKS_HOME}/java-extensions/hudi-reader/target/starrocks-hudi-reader.jar ${STARROCKS_OUTPUT}/be/lib/hudi-reader-lib cp -r -p ${STARROCKS_THIRDPARTY}/installed/hadoop/share/hadoop/common ${STARROCKS_OUTPUT}/be/lib/hadoop/ cp -r -p ${STARROCKS_THIRDPARTY}/installed/hadoop/share/hadoop/hdfs ${STARROCKS_OUTPUT}/be/lib/hadoop/ cp -r -p ${STARROCKS_THIRDPARTY}/installed/hadoop/lib/native ${STARROCKS_OUTPUT}/be/lib/hadoop/ diff --git a/fe/fe-core/src/main/java/com/starrocks/catalog/HudiTable.java b/fe/fe-core/src/main/java/com/starrocks/catalog/HudiTable.java index a75e080877d32..6e346dc013877 100644 --- a/fe/fe-core/src/main/java/com/starrocks/catalog/HudiTable.java +++ b/fe/fe-core/src/main/java/com/starrocks/catalog/HudiTable.java @@ -33,11 +33,13 @@ import org.apache.avro.Schema; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -47,11 +49,15 @@ import java.io.IOException; import java.util.HashSet; import java.util.List; +import java.util.Locale; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import static com.starrocks.external.HiveMetaStoreTableUtils.isInternalCatalog; +import static org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils.getTypeInfoFromTypeString; +import static org.apache.hudi.common.table.HoodieTableConfig.CREATE_SCHEMA; /** * Currently, we depend on Hive metastore to obtain table/partition path and statistics. @@ -74,6 +80,11 @@ public class HudiTable extends Table implements HiveMetaStoreTable { public static final String HUDI_TABLE_PRE_COMBINE_FIELD = "hudi.table.preCombineField"; public static final String HUDI_BASE_PATH = "hudi.table.base.path"; public static final String HUDI_TABLE_BASE_FILE_FORMAT = "hudi.table.base.file.format"; + public static final String HUDI_TABLE_AVRO_RECORD_SCHEMA = "hudi.table.avro.record.schema"; + public static final String HUDI_TABLE_SERDE_LIB = "hudi.table.serde.lib"; + public static final String HUDI_TABLE_INPUT_FOAMT = "hudi.table.input.format"; + public static final String HUDI_TABLE_COLUMN_NAMES = "hudi.table.column.names"; + public static final String HUDI_TABLE_COLUMN_TYPES = "hudi.table.column.types"; public static final String HUDI_DB = "database"; public static final String HUDI_TABLE = "table"; public static final String HUDI_RESOURCE = "resource"; @@ -125,6 +136,10 @@ public String getHudiBasePath() { return hudiProperties.get(HUDI_BASE_PATH); } + public String getTableCreateSchema() { + return hudiProperties.get(HUDI_TABLE_AVRO_RECORD_SCHEMA); + } + @Override public String getTableName() { return table; @@ -254,33 +269,52 @@ private void initProps(Map properties) throws DdlException { throw new DdlException("Resource [" + resourceName + "] is not hudi resource"); } } - this.resourceName = resourceName; org.apache.hadoop.hive.metastore.api.Table metastoreTable = - HudiTable.validate(resourceName, this.db, this.table, this.fullSchema); + GlobalStateMgr.getCurrentState().getHiveRepository().getTable(resourceName, this.db, this.table); + Schema tableSchema = HudiTable.loadHudiSchema(metastoreTable); + + for (Column column : this.fullSchema) { + if (!column.isAllowNull()) { + throw new DdlException( + "Hudi extern table does not support no-nullable column: [" + column.getName() + "]"); + } + Schema.Field hudiColumn = tableSchema.getField(column.getName()); + if (hudiColumn == null) { + throw new DdlException("Column [" + column.getName() + "] not exists in hudi."); + } + // for each column in hudi schema, we should transfer hudi column type to starrocks type + // after that, we should check column type whether is same + // Only internal catalog like hudi external table need to validate column type + if (HiveMetaStoreTableUtils.isInternalCatalog(resourceName) && + !validColumnType(hudiColumn.schema(), column.getType())) { + throw new DdlException("Can not convert hudi column type [" + hudiColumn.schema().toString() + "] " + + "to starrocks type [" + column.getPrimitiveType() + "], column name: " + column.getName() + + ", starrocks type should be " + + HiveMetaStoreTableUtils.convertHudiTableColumnType(hudiColumn.schema()).toSql()); + } + } String hudiBasePath = metastoreTable.getSd().getLocation(); if (!Strings.isNullOrEmpty(hudiBasePath)) { hudiProperties.put(HUDI_BASE_PATH, hudiBasePath); } + String serdeLib = metastoreTable.getSd().getSerdeInfo().getSerializationLib(); + if (!Strings.isNullOrEmpty(serdeLib)) { + hudiProperties.put(HUDI_TABLE_SERDE_LIB, serdeLib); + } + String inputFormat = metastoreTable.getSd().getInputFormat(); + if (!Strings.isNullOrEmpty(inputFormat)) { + hudiProperties.put(HUDI_TABLE_INPUT_FOAMT, inputFormat); + } Configuration conf = new Configuration(); HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(conf).setBasePath(hudiBasePath).build(); HoodieTableConfig hudiTableConfig = metaClient.getTableConfig(); - Schema tableSchema; - try { - tableSchema = loadHudiSchema(metastoreTable); - } catch (Exception e) { - throw new DdlException("Cannot get hudi table schema."); - } - org.apache.hudi.common.model.HoodieTableType hudiTableType = hudiTableConfig.getTableType(); - if (hudiTableType == org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ) { - throw new DdlException("MERGE_ON_READ type of hudi table is NOT supported."); - } hudiProperties.put(HUDI_TABLE_TYPE, hudiTableType.name()); Option hudiTablePrimaryKey = hudiTableConfig.getRecordKeyFields(); @@ -296,6 +330,32 @@ private void initProps(Map properties) throws DdlException { HoodieFileFormat hudiBaseFileFormat = hudiTableConfig.getBaseFileFormat(); hudiProperties.put(HUDI_TABLE_BASE_FILE_FORMAT, hudiBaseFileFormat.name()); + StringBuilder columnNamesBuilder = new StringBuilder(); + StringBuilder columnTypesBuilder = new StringBuilder(); + List allFields = metastoreTable.getSd().getCols(); + allFields.addAll(metastoreTable.getPartitionKeys()); + + hudiProperties.put(HUDI_TABLE_AVRO_RECORD_SCHEMA, metaClient.getTableConfig().getString(CREATE_SCHEMA)); + + boolean isFirst = true; + for (Schema.Field hudiField : tableSchema.getFields()) { + if (!isFirst) { + columnNamesBuilder.append(","); + columnTypesBuilder.append(","); + } + Optional field = allFields.stream() + .filter(f -> f.getName().equals(hudiField.name().toLowerCase(Locale.ROOT))).findFirst(); + if (!field.isPresent()) { + throw new DdlException("Hudi column [" + hudiField.name() + "] not exists in hive metastore."); + } + TypeInfo fieldInfo = getTypeInfoFromTypeString(field.get().getType()); + columnNamesBuilder.append(field.get().getName()); + columnTypesBuilder.append(fieldInfo.getTypeName()); + isFirst = false; + } + hudiProperties.put(HUDI_TABLE_COLUMN_NAMES, columnNamesBuilder.toString()); + hudiProperties.put(HUDI_TABLE_COLUMN_TYPES, columnTypesBuilder.toString()); + Option hudiPartitionFields = hudiTableConfig.getPartitionFields(); if (hudiPartitionFields.isPresent()) { for (String partField : hudiPartitionFields.get()) { @@ -323,7 +383,6 @@ private void initProps(Map properties) throws DdlException { this.dataColumnNames.add(hudiField.name()); } } - if (!copiedProps.isEmpty()) { throw new DdlException("Unknown table properties: " + copiedProps); } @@ -498,6 +557,17 @@ public TTableDescriptor toThrift(List p tHudiTable.putToPartitions(partitionId, tPartition); } + Configuration conf = new Configuration(); + HoodieTableMetaClient metaClient = + HoodieTableMetaClient.builder().setConf(conf).setBasePath(getHudiBasePath()).build(); + HoodieTimeline timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants(); + String queryInstant = timeline.lastInstant().get().getTimestamp(); + tHudiTable.setInstant_time(queryInstant); + tHudiTable.setHive_column_names(hudiProperties.get(HUDI_TABLE_COLUMN_NAMES)); + tHudiTable.setHive_column_types(hudiProperties.get(HUDI_TABLE_COLUMN_TYPES)); + tHudiTable.setInput_format(hudiProperties.get(HUDI_TABLE_INPUT_FOAMT)); + tHudiTable.setSerde_lib(hudiProperties.get(HUDI_TABLE_SERDE_LIB)); + TTableDescriptor tTableDescriptor = new TTableDescriptor(id, TTableType.HUDI_TABLE, fullSchema.size(), 0, table, db); tTableDescriptor.setHudiTable(tHudiTable); diff --git a/fe/fe-core/src/main/java/com/starrocks/external/RemoteScanRangeLocations.java b/fe/fe-core/src/main/java/com/starrocks/external/RemoteScanRangeLocations.java index 0f0e0329df595..59f7a05718133 100644 --- a/fe/fe-core/src/main/java/com/starrocks/external/RemoteScanRangeLocations.java +++ b/fe/fe-core/src/main/java/com/starrocks/external/RemoteScanRangeLocations.java @@ -5,6 +5,8 @@ import com.google.common.collect.Lists; import com.starrocks.analysis.DescriptorTable; import com.starrocks.catalog.HiveMetaStoreTable; +import com.starrocks.catalog.HiveTable; +import com.starrocks.catalog.HudiTable; import com.starrocks.catalog.PartitionKey; import com.starrocks.catalog.Table; import com.starrocks.common.Config; @@ -111,6 +113,34 @@ private void createScanRangeLocationsForSplit(long partitionId, HivePartition pa result.add(scanRangeLocations); } + private void createHudiScanRangeLocations(long partitionId, + HivePartition partition, + HdfsFileDesc fileDesc) { + TScanRangeLocations scanRangeLocations = new TScanRangeLocations(); + + THdfsScanRange hdfsScanRange = new THdfsScanRange(); + hdfsScanRange.setRelative_path(fileDesc.getFileName()); + hdfsScanRange.setOffset(0); + hdfsScanRange.setLength(fileDesc.getLength()); + hdfsScanRange.setPartition_id(partitionId); + hdfsScanRange.setFile_length(fileDesc.getLength()); + hdfsScanRange.setFile_format(partition.getFormat().toThrift()); + hdfsScanRange.setText_file_desc(fileDesc.getTextFileFormatDesc().toThrift()); + for (String log : fileDesc.getHudiDeltaLogs()) { + hdfsScanRange.addToHudi_logs(log); + } + hdfsScanRange.setHudi_mor_table((fileDesc.isHudiMORTable())); + TScanRange scanRange = new TScanRange(); + scanRange.setHdfs_scan_range(hdfsScanRange); + scanRangeLocations.setScan_range(scanRange); + + // TODO: get block info + TScanRangeLocation scanRangeLocation = new TScanRangeLocation(new TNetworkAddress("-1", -1)); + scanRangeLocations.addToLocations(scanRangeLocation); + + result.add(scanRangeLocations); + } + public void setupScanRangeLocations(DescriptorTable descTbl, Table table, HDFSScanNodePredicates scanNodePredicates) throws UserException { HiveMetaStoreTable hiveMetaStoreTable = (HiveMetaStoreTable) table; @@ -129,20 +159,37 @@ public void setupScanRangeLocations(DescriptorTable descTbl, Table table, } List partitions = hiveMetaStoreTable.getPartitions(partitionKeys); - for (int i = 0; i < partitions.size(); i++) { - descTbl.addReferencedPartitions(table, partitionInfos.get(i)); - for (HdfsFileDesc fileDesc : partitions.get(i).getFiles()) { - if (fileDesc.getLength() == 0) { - continue; + if (table instanceof HiveTable) { + for (int i = 0; i < partitions.size(); i++) { + descTbl.addReferencedPartitions(table, partitionInfos.get(i)); + for (HdfsFileDesc fileDesc : partitions.get(i).getFiles()) { + if (fileDesc.getLength() == 0) { + continue; + } + for (HdfsFileBlockDesc blockDesc : fileDesc.getBlockDescs()) { + addScanRangeLocations(partitionInfos.get(i).getId(), partitions.get(i), fileDesc, blockDesc); + LOG.debug("Add scan range success. partition: {}, file: {}, block: {}-{}", + partitions.get(i).getFullPath(), fileDesc.getFileName(), blockDesc.getOffset(), + blockDesc.getLength()); + } } - for (HdfsFileBlockDesc blockDesc : fileDesc.getBlockDescs()) { - addScanRangeLocations(partitionInfos.get(i).getId(), partitions.get(i), fileDesc, blockDesc); - LOG.debug("Add scan range success. partition: {}, file: {}, block: {}-{}", - partitions.get(i).getFullPath(), fileDesc.getFileName(), blockDesc.getOffset(), - blockDesc.getLength()); + } + } else if (table instanceof HudiTable) { + for (int i = 0; i < partitions.size(); i++) { + descTbl.addReferencedPartitions(table, partitionInfos.get(i)); + for (HdfsFileDesc fileDesc : partitions.get(i).getFiles()) { + if (fileDesc.getLength() == -1 && fileDesc.getHudiDeltaLogs().isEmpty()) { + String message = "Error: get a empty hudi fileSlice"; + throw new StarRocksPlannerException(message, ErrorType.INTERNAL_ERROR); + } + createHudiScanRangeLocations(partitionInfos.get(i).getId(), partitions.get(i), fileDesc); } } + } else { + String message = "Only Hive/Hudi table is supported."; + throw new StarRocksPlannerException(message, ErrorType.INTERNAL_ERROR); } + LOG.debug("Get {} scan range locations cost: {} ms", getScanRangeLocationsSize(), (System.currentTimeMillis() - start)); } diff --git a/fe/fe-core/src/main/java/com/starrocks/external/hive/HdfsFileDesc.java b/fe/fe-core/src/main/java/com/starrocks/external/hive/HdfsFileDesc.java index 7a99025c8ab2b..f1fe72f4fd9ed 100644 --- a/fe/fe-core/src/main/java/com/starrocks/external/hive/HdfsFileDesc.java +++ b/fe/fe-core/src/main/java/com/starrocks/external/hive/HdfsFileDesc.java @@ -12,22 +12,20 @@ public class HdfsFileDesc { private ImmutableList blockDescs; private boolean splittable; private TextFileFormatDesc textFileFormatDesc; + private ImmutableList hudiDeltaLogs; + private boolean isHudiMORTable; public HdfsFileDesc(String fileName, String compression, long length, - ImmutableList blockDescs) { + ImmutableList blockDescs, ImmutableList hudiDeltaLogs, + boolean splittable, TextFileFormatDesc textFileFormatDesc, boolean isHudiMORTable) { this.fileName = fileName; this.compression = compression; this.length = length; this.blockDescs = blockDescs; - this.splittable = false; - } - - public HdfsFileDesc(String fileName, String compression, long length, - ImmutableList blockDescs, boolean splittable, - TextFileFormatDesc textFileFormatDesc) { - this(fileName, compression, length, blockDescs); + this.hudiDeltaLogs = hudiDeltaLogs; this.splittable = splittable; this.textFileFormatDesc = textFileFormatDesc; + this.isHudiMORTable = isHudiMORTable; } public String getFileName() { @@ -53,4 +51,13 @@ public boolean isSplittable() { public TextFileFormatDesc getTextFileFormatDesc() { return textFileFormatDesc; } + + public ImmutableList getHudiDeltaLogs() { + return hudiDeltaLogs; + } + + public boolean isHudiMORTable() { + return isHudiMORTable; + } + } diff --git a/fe/fe-core/src/main/java/com/starrocks/external/hive/HiveMetaClient.java b/fe/fe-core/src/main/java/com/starrocks/external/hive/HiveMetaClient.java index e5338a8513d67..086f27f231166 100644 --- a/fe/fe-core/src/main/java/com/starrocks/external/hive/HiveMetaClient.java +++ b/fe/fe-core/src/main/java/com/starrocks/external/hive/HiveMetaClient.java @@ -38,19 +38,16 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; -import org.apache.hudi.common.config.HoodieMetadataConfig; -import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.BaseFile; +import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.table.view.FileSystemViewManager; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; @@ -64,6 +61,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.TimeZone; import java.util.concurrent.ConcurrentHashMap; @@ -321,30 +319,23 @@ public HivePartition getHudiPartition(String dbName, String tableName, List getHudiFileDescs(StorageDescriptor sd, HoodieTableMetaClient metaClient, String partName) throws Exception { List fileDescs = Lists.newArrayList(); - FileSystem fileSystem = metaClient.getRawFs(); - HoodieEngineContext engineContext = new HoodieLocalEngineContext(conf); - HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder().build(); - HoodieTableFileSystemView fileSystemView = FileSystemViewManager.createInMemoryFileSystemView(engineContext, - metaClient, metadataConfig); - HoodieTimeline activeInstants = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); - Option latestInstant = activeInstants.lastInstant(); - String queryInstant = latestInstant.get().getTimestamp(); - Iterator hoodieBaseFileIterator = fileSystemView - .getLatestBaseFilesBeforeOrOn(partName, queryInstant).iterator(); - while (hoodieBaseFileIterator.hasNext()) { - HoodieBaseFile baseFile = hoodieBaseFileIterator.next(); - - FileStatus fileStatus = HoodieInputFormatUtils.getFileStatus(baseFile); - BlockLocation[] blockLocations; - if (fileStatus instanceof LocatedFileStatus) { - blockLocations = ((LocatedFileStatus) fileStatus).getBlockLocations(); - } else { - blockLocations = fileSystem.getFileBlockLocations(fileStatus, 0, fileStatus.getLen()); - } - List fileBlockDescs = getHdfsFileBlockDescs(blockLocations); - fileDescs.add(new HdfsFileDesc(baseFile.getFileName(), "", fileStatus.getLen(), - ImmutableList.copyOf(fileBlockDescs), HdfsFileFormat.isSplittable(sd.getInputFormat()), - getTextFileFormatDesc(sd))); + HoodieTimeline timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants(); + String globPath = String.format("%s/%s/*", metaClient.getBasePath(), partName); + List statuses = FSUtils.getGlobStatusExcludingMetaFolder(metaClient.getRawFs(), new Path(globPath)); + HoodieTableFileSystemView fileSystemView = new HoodieTableFileSystemView(metaClient, + timeline, statuses.toArray(new FileStatus[0])); + String queryInstant = timeline.lastInstant().get().getTimestamp(); + Iterator hoodieFileSliceIterator = fileSystemView + .getLatestMergedFileSlicesBeforeOrOn(partName, queryInstant).iterator(); + while (hoodieFileSliceIterator.hasNext()) { + FileSlice fileSlice = hoodieFileSliceIterator.next(); + Optional baseFile = fileSlice.getBaseFile().toJavaOptional(); + String fileName = baseFile.map(BaseFile::getFileName).orElse(""); + long fileLength = baseFile.map(BaseFile::getFileLen).orElse(-1L); + List logs = fileSlice.getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList()); + fileDescs.add(new HdfsFileDesc(fileName, "", fileLength, + ImmutableList.of(), ImmutableList.copyOf(logs), HdfsFileFormat.isSplittable(sd.getInputFormat()), + getTextFileFormatDesc(sd), metaClient.getTableType() == HoodieTableType.MERGE_ON_READ)); } return fileDescs; } @@ -692,7 +683,8 @@ public List getHdfsFileDescs(String dirPath, boolean isSplittable, BlockLocation[] blockLocations = locatedFileStatus.getBlockLocations(); List fileBlockDescs = getHdfsFileBlockDescs(blockLocations); fileDescs.add(new HdfsFileDesc(fileName, "", locatedFileStatus.getLen(), - ImmutableList.copyOf(fileBlockDescs), isSplittable, getTextFileFormatDesc(sd))); + ImmutableList.copyOf(fileBlockDescs), ImmutableList.of(), + isSplittable, getTextFileFormatDesc(sd), false)); } } catch (FileNotFoundException ignored) { // hive empty partition may not create directory diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/Coordinator.java b/fe/fe-core/src/main/java/com/starrocks/qe/Coordinator.java index ac0e4983fca47..1d8fe4b4cc733 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/Coordinator.java @@ -88,6 +88,7 @@ import com.starrocks.thrift.TEsScanRange; import com.starrocks.thrift.TExecBatchPlanFragmentsParams; import com.starrocks.thrift.TExecPlanFragmentParams; +import com.starrocks.thrift.THdfsScanRange; import com.starrocks.thrift.TInternalScanRange; import com.starrocks.thrift.TNetworkAddress; import com.starrocks.thrift.TPipelineProfileLevel; @@ -2992,6 +2993,13 @@ public void appendScanRange(StringBuilder sb, List params) { .append(", shardid=").append(esScanRange.getShard_id()) .append("}"); } + THdfsScanRange hdfsScanRange = range.getScan_range().getHdfs_scan_range(); + if (hdfsScanRange != null) { + sb.append("{relative_path=").append(hdfsScanRange.getRelative_path()) + .append(", offset=").append(hdfsScanRange.getOffset()) + .append(", length=").append(hdfsScanRange.getLength()) + .append("}"); + } } sb.append("]"); } diff --git a/fe/fe-core/src/test/java/com/starrocks/catalog/HudiTableTest.java b/fe/fe-core/src/test/java/com/starrocks/catalog/HudiTableTest.java index 564ee6ff3cf05..ffb829cf3ddd6 100644 --- a/fe/fe-core/src/test/java/com/starrocks/catalog/HudiTableTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/catalog/HudiTableTest.java @@ -31,6 +31,7 @@ import mockit.Mocked; import org.apache.avro.Schema; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -80,18 +81,38 @@ public void testWithResourceName(@Mocked GlobalStateMgr globalStateMgr, hudiResource.setProperties(resourceProperties); List hudiFields = new ArrayList<>(); + hudiFields.add(new Schema.Field("_hoodie_commit_time", + Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING)), "", null)); + hudiFields.add(new Schema.Field("_hoodie_commit_seqno", + Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING)), "", null)); + hudiFields.add(new Schema.Field("_hoodie_record_key", + Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING)), "", null)); + hudiFields.add(new Schema.Field("_hoodie_partition_path", + Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING)), "", null)); + hudiFields.add(new Schema.Field("_hoodie_file_name", + Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING)), "", null)); hudiFields.add(new Schema.Field("col1", Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.LONG)), "", null)); hudiFields.add(new Schema.Field("col2", Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.INT)), "", null)); Schema hudiSchema = Schema.createRecord(hudiFields); - List partKeys = Lists.newArrayList(new FieldSchema("col1", "BIGINT", "")); - List unPartKeys = Lists.newArrayList(new FieldSchema("col2", "INT", "")); + List partKeys = Lists.newArrayList(new FieldSchema("col1", "bigint", "")); + List unPartKeys = Lists.newArrayList(); + unPartKeys.add(new FieldSchema("_hoodie_commit_time", "string", "")); + unPartKeys.add(new FieldSchema("_hoodie_commit_seqno", "string", "")); + unPartKeys.add(new FieldSchema("_hoodie_record_key", "string", "")); + unPartKeys.add(new FieldSchema("_hoodie_partition_path", "string", "")); + unPartKeys.add(new FieldSchema("_hoodie_file_name", "string", "")); + unPartKeys.add(new FieldSchema("col2", "int", "")); String hdfsPath = "hdfs://127.0.0.1:10000/hudi"; StorageDescriptor sd = new StorageDescriptor(); sd.setCols(unPartKeys); sd.setLocation(hdfsPath); + sd.setInputFormat("org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat"); + SerDeInfo sdInfo = new SerDeInfo(); + sdInfo.setSerializationLib("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"); + sd.setSerdeInfo(sdInfo); Table msTable = new Table(); msTable.setPartitionKeys(partKeys); msTable.setSd(sd); @@ -106,15 +127,9 @@ public void testWithResourceName(@Mocked GlobalStateMgr globalStateMgr, result = globalStateMgr; minTimes = 0; - globalStateMgr.getResourceMgr(); - result = resourceMgr; - resourceMgr.getResource("hudi0"); result = hudiResource; - globalStateMgr.getHiveRepository(); - result = hiveRepository; - hiveRepository.getTable(resourceName, hudiDb, hudiTable); result = msTable; diff --git a/fe/fe-core/src/test/java/com/starrocks/external/hive/HiveMetaCacheTest.java b/fe/fe-core/src/test/java/com/starrocks/external/hive/HiveMetaCacheTest.java index 319108c2a04b6..ca119467dad16 100644 --- a/fe/fe-core/src/test/java/com/starrocks/external/hive/HiveMetaCacheTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/external/hive/HiveMetaCacheTest.java @@ -291,7 +291,11 @@ public HivePartition getPartition(String dbName, String tableName, List ImmutableList.of(new HdfsFileDesc("file1", "", 10000L, - ImmutableList.of())), + ImmutableList.of(), + ImmutableList.of(), + false, + null, + false)), partitionPath); } diff --git a/fe/fe-core/src/test/java/com/starrocks/server/CatalogLevelTest.java b/fe/fe-core/src/test/java/com/starrocks/server/CatalogLevelTest.java index c31a60c859c3c..33cd760212fb0 100644 --- a/fe/fe-core/src/test/java/com/starrocks/server/CatalogLevelTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/server/CatalogLevelTest.java @@ -2,7 +2,6 @@ package com.starrocks.server; -import com.amazonaws.services.dynamodbv2.document.Expected; import com.google.common.collect.Lists; import com.starrocks.analysis.StatementBase; import com.starrocks.catalog.Database; @@ -23,6 +22,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -36,8 +36,6 @@ import java.util.ArrayList; import java.util.List; -import javax.validation.constraints.AssertTrue; - import static org.apache.iceberg.types.Types.NestedField.optional; public class CatalogLevelTest { @@ -89,24 +87,53 @@ public void testQueryHudiCatalog(@Mocked MetadataMgr metadataMgr, @Mocked HiveRepository hiveRepository, @Mocked HoodieTableMetaClient metaClient, @Mocked TableSchemaResolver schemaUtil) throws Exception { + String catalogName = "hudi_catalog"; + String resourceName = "thrift://127.0.0.1:9083"; + String dbName = "hudi_db"; + String tableName = "hudi_table"; String createCatalog = "CREATE EXTERNAL CATALOG hudi_catalog PROPERTIES(\"type\"=\"hudi\", \"hive.metastore.uris\"=\"thrift://127.0.0.1:9083\")"; StatementBase statementBase = AnalyzeTestUtil.analyzeSuccess(createCatalog); Assert.assertTrue(statementBase instanceof CreateCatalogStmt); GlobalStateMgr.getCurrentState().getCatalogMgr().createCatalog((CreateCatalogStmt) statementBase); - List partKeys = Lists.newArrayList(new FieldSchema("col1", "BIGINT", "")); - List unPartKeys = Lists.newArrayList(new FieldSchema("col2", "INT", "")); + List partKeys = Lists.newArrayList(new FieldSchema("col1", "bigint", "")); + List unPartKeys = Lists.newArrayList(); + unPartKeys.add(new FieldSchema("_hoodie_commit_time", "string", "")); + unPartKeys.add(new FieldSchema("_hoodie_commit_seqno", "string", "")); + unPartKeys.add(new FieldSchema("_hoodie_record_key", "string", "")); + unPartKeys.add(new FieldSchema("_hoodie_partition_path", "string", "")); + unPartKeys.add(new FieldSchema("_hoodie_file_name", "string", "")); + unPartKeys.add(new FieldSchema("col2", "int", "")); String hdfsPath = "hdfs://127.0.0.1:10000/hudi"; StorageDescriptor sd = new StorageDescriptor(); sd.setCols(unPartKeys); sd.setLocation(hdfsPath); + sd.setInputFormat("org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat"); + SerDeInfo sdInfo = new SerDeInfo(); + sdInfo.setSerializationLib("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"); + sd.setSerdeInfo(sdInfo); Table msTable1 = new Table(); - msTable1.setDbName("hudi_db"); - msTable1.setTableName("hudi_table"); + msTable1.setDbName(dbName); + msTable1.setTableName(tableName); msTable1.setPartitionKeys(partKeys); msTable1.setSd(sd); msTable1.setTableType("MANAGED_TABLE"); List hudiFields = new ArrayList<>(); + hudiFields.add(new org.apache.avro.Schema.Field("_hoodie_commit_time", + org.apache.avro.Schema.createUnion(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.NULL), + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING)), "", null)); + hudiFields.add(new org.apache.avro.Schema.Field("_hoodie_commit_seqno", + org.apache.avro.Schema.createUnion(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.NULL), + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING)), "", null)); + hudiFields.add(new org.apache.avro.Schema.Field("_hoodie_record_key", + org.apache.avro.Schema.createUnion(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.NULL), + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING)), "", null)); + hudiFields.add(new org.apache.avro.Schema.Field("_hoodie_partition_path", + org.apache.avro.Schema.createUnion(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.NULL), + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING)), "", null)); + hudiFields.add(new org.apache.avro.Schema.Field("_hoodie_file_name", + org.apache.avro.Schema.createUnion(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.NULL), + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING)), "", null)); hudiFields.add(new org.apache.avro.Schema.Field("col1", org.apache.avro.Schema.createUnion( org.apache.avro.Schema.create(org.apache.avro.Schema.Type.NULL), org.apache.avro.Schema.create( @@ -126,30 +153,27 @@ public Resource getResource(String name) { new Expectations() { { - GlobalStateMgr.getCurrentState().getResourceMgr(); - result = resourceMgr; - - GlobalStateMgr.getCurrentState().getHiveRepository(); - result = hiveRepository; - schemaUtil.getTableAvroSchema(); result = hudiSchema; + + hiveRepository.getTable(resourceName, dbName, tableName); + result = msTable1; } }; - com.starrocks.catalog.Table hudiTable = HiveMetaStoreTableUtils.convertHudiConnTableToSRTable(msTable1, "thrift://127.0.0.1:9083"); + com.starrocks.catalog.Table hudiTable = HiveMetaStoreTableUtils.convertHudiConnTableToSRTable(msTable1, resourceName); new Expectations() { { - metadataMgr.getDb("hudi_catalog", "hudi_db"); - result = new Database(111, "hudi_db"); + metadataMgr.getDb(catalogName, dbName); + result = new Database(111, dbName); minTimes = 0; - metadataMgr.getTable("hudi_catalog", "hudi_db", "hudi_table"); + metadataMgr.getTable(catalogName, dbName, tableName); result = hudiTable; } }; - String sql_1 = "select col1 from hudi_catalog.hudi_db.hudi_table"; + String sql_1 = "select col1 from " + catalogName + "." + dbName + "." + tableName; AnalyzeTestUtil.analyzeSuccess(sql_1); } diff --git a/fe/pom.xml b/fe/pom.xml index 1146b9f79e3ef..d7653326ecb1b 100644 --- a/fe/pom.xml +++ b/fe/pom.xml @@ -49,7 +49,7 @@ under the License. 1.10.1 3.3.3 false - 0.10.0 + 0.10.1 3.1.2-13 0.2.14 starrocks diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index 0bc61aa561a43..e9ed41c17a427 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -330,6 +330,21 @@ struct THudiTable { // The prefixes of locations of partitions in this table 5: optional list partition_prefixes + + // hudi table instant time + 6: optional string instant_time + + // hudi table hive_column_names + 7: optional string hive_column_names + + // hudi table hive_column_types + 8: optional string hive_column_types + + // hudi table input_format + 9: optional string input_format + + // hudi table serde_lib + 10: optional string serde_lib } struct TJDBCTable { diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index c17aed762f996..d622ef73a5b0d 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -262,6 +262,12 @@ struct THdfsScanRange { // for iceberg table scanrange should contains the full path of file 8: optional string full_path + + // delta logs of hudi MOR table + 9: optional list hudi_logs + + // hudi table type + 10: optional bool hudi_mor_table } // Specification of an individual data range which is held in its entirety diff --git a/java-extensions/hudi-reader/pom.xml b/java-extensions/hudi-reader/pom.xml new file mode 100644 index 0000000000000..607e004d074c8 --- /dev/null +++ b/java-extensions/hudi-reader/pom.xml @@ -0,0 +1,168 @@ + + + + java-extensions + com.starrocks + 1.0-SNAPSHOT + + 4.0.0 + + hudi-reader + + + 8 + 8 + ${basedir}/../ + 1.7.32 + 0.11.1 + 2.6.7 + 2.6.7.3 + + + + + com.starrocks + jni-connector + 1.0.0 + + + + org.apache.hudi + hudi-presto-bundle + ${hudi.version} + + + com.google.protobuf + protobuf-java + + + commons-lang + commons-lang + + + org.apache.hudi + hudi-common + + + org.apache.hudi + hudi-hadoop-mr-bundle + + + org.apache.parquet + parquet-avro + + + org.apache.avro + avro + + + + + + com.facebook.presto.hive + hive-apache + 3.0.0-8 + + + org.slf4j + slf4j-log4j12 + + + + + + com.facebook.presto.hadoop + hadoop-apache2 + 2.7.4-9 + + + org.slf4j + slf4j-log4j12 + + + + + + org.slf4j + slf4j-log4j12 + ${slf4j.version} + + + + com.fasterxml.jackson.core + jackson-annotations + ${fasterxml.version} + + + + com.fasterxml.jackson.core + jackson-core + ${fasterxml.version} + + + + com.fasterxml.jackson.core + jackson-databind + ${fasterxml.jackson.databind.version} + + + + + + starrocks-hudi-reader + + + org.apache.maven.plugins + maven-compiler-plugin + ${compiler-plugin.version} + + ${java.version} + ${java.version} + + + + + org.apache.maven.plugins + maven-dependency-plugin + 3.1.1 + + + copy-dependencies + package + + copy-dependencies + + + ${project.build.directory}/hudi-reader-lib + true + + + + + + org.apache.maven.plugins + maven-assembly-plugin + 3.1.1 + + + + jar-with-dependencies + + + + + + make-assembly + package + + single + + + + + + + + \ No newline at end of file diff --git a/java-extensions/hudi-reader/src/main/java/com/starrocks/hudi/reader/HudiSliceScanner.java b/java-extensions/hudi-reader/src/main/java/com/starrocks/hudi/reader/HudiSliceScanner.java new file mode 100644 index 0000000000000..785bba4bd93aa --- /dev/null +++ b/java-extensions/hudi-reader/src/main/java/com/starrocks/hudi/reader/HudiSliceScanner.java @@ -0,0 +1,239 @@ +// This file is licensed under the Elastic License 2.0. Copyright 2021-present, StarRocks Inc. + +package com.starrocks.hudi.reader; + +import com.starrocks.jni.connector.ConnectorScanner; +import com.starrocks.jni.connector.TypeMapping; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.stream.Collectors.toList; + +public class HudiSliceScanner extends ConnectorScanner { + + private final String basePath; + private final String hiveColumnNames; + private final String[] hiveColumnTypes; + private final String[] requiredFields; + private final String instantTime; + private final String[] deltaFilePaths; + private final String dataFilePath; + private final long dataFileLenth; + private final String serde; + private final String inputFormat; + private RecordReader reader; + private StructObjectInspector rowInspector; + private ObjectInspector[] fieldInspectors; + private StructField[] structFields; + private Deserializer deserializer; + private final int fetchSize; + + public HudiSliceScanner(int fetchSize, Map params) { + this.fetchSize = fetchSize; + this.basePath = params.get("base_path"); + this.hiveColumnNames = params.get("hive_column_names"); + this.hiveColumnTypes = params.get("hive_column_types").split(","); + this.requiredFields = params.get("required_fields").split(","); + this.instantTime = params.get("instant_time"); + if (params.get("delta_file_paths").length() == 0) { + this.deltaFilePaths = new String[0]; + } else { + this.deltaFilePaths = params.get("delta_file_paths").split(","); + } + this.dataFilePath = params.get("data_file_path"); + this.dataFileLenth = Long.parseLong(params.get("data_file_length")); + this.serde = params.get("serde"); + this.inputFormat = params.get("input_format");; + this.fieldInspectors = new ObjectInspector[requiredFields.length]; + this.structFields = new StructField[requiredFields.length]; + } + + @Override + public void open() throws IOException { + try { + Properties properties = new Properties(); + Configuration conf = new Configuration(); + conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem"); + conf.setBoolean("dfs.client.use.legacy.blockreader", false); + JobConf jobConf = new JobConf(conf); + jobConf.setBoolean("hive.io.file.read.all.columns", false); + String[] hiveColumnNames = this.hiveColumnNames.split(","); + HashMap hiveColumnNameToIndex = new HashMap<>(); + HashMap hiveColumnNameToType = new HashMap<>(); + for (int i = 0; i < hiveColumnNames.length; i++) { + hiveColumnNameToIndex.put(hiveColumnNames[i], i); + hiveColumnNameToType.put(hiveColumnNames[i], hiveColumnTypes[i]); + } + + String[] requiredTypes = new String[requiredFields.length]; + StringBuilder columnIdBuilder = new StringBuilder(); + boolean isFirst = true; + for (int i = 0; i < requiredFields.length; i++) { + if (!isFirst) { + columnIdBuilder.append(","); + } + columnIdBuilder.append(hiveColumnNameToIndex.get(requiredFields[i])); + requiredTypes[i] = hiveColumnNameToType.get(requiredFields[i]); + isFirst = false; + } + initOffHeapTableWriter(requiredTypes, fetchSize, TypeMapping.hiveTypeMappings); + + properties.setProperty("hive.io.file.readcolumn.ids", columnIdBuilder.toString()); + properties.setProperty("hive.io.file.readcolumn.names", String.join(",", this.requiredFields)); + properties.setProperty("columns", this.hiveColumnNames); + properties.setProperty("columns.types", String.join(",", this.hiveColumnTypes)); + properties.setProperty("serialization.lib", this.serde); + properties.stringPropertyNames().forEach(name -> jobConf.set(name, properties.getProperty(name))); + + // dataFileLenth==-1 or dataFilePath == "" means logs only scan + String realtimePath = dataFileLenth != -1 ? dataFilePath : deltaFilePaths[0]; + long realtimeLength = dataFileLenth != -1 ? dataFileLenth : 0; + Path path = new Path(realtimePath); + FileSplit fileSplit = new FileSplit(path, 0, realtimeLength, new String[] {""}); + List logFiles = Arrays.stream(deltaFilePaths).map(HoodieLogFile::new).collect(toList()); + FileSplit hudiSplit = new HoodieRealtimeFileSplit(fileSplit, basePath, logFiles, instantTime, false, Option.empty()); + + InputFormat inputFormatClass = createInputFormat(jobConf, inputFormat); + reader = (RecordReader) inputFormatClass + .getRecordReader(hudiSplit, jobConf, Reporter.NULL); + + deserializer = getDeserializer(jobConf, properties, serde); + rowInspector = getTableObjectInspector(deserializer); + for (int i = 0; i < requiredFields.length; i++) { + StructField field = rowInspector.getStructFieldRef(requiredFields[i]); + structFields[i] = field; + fieldInspectors[i] = field.getFieldObjectInspector(); + } + + } catch (Exception e) { + close(); + e.printStackTrace(); + throw new IOException("Failed to open the hudi MOR slice reader.", e); + } + } + + @Override + public void close() throws IOException { + try { + if (reader != null) { + reader.close(); + } + } catch (IOException e) { + e.printStackTrace(); + throw new IOException("Failed to close the hudi MOR slice reader.", e); + } + } + + @Override + public int getNext() throws IOException { + try { + NullWritable key = reader.createKey(); + ArrayWritable value = reader.createValue(); + int numRows = 0; + for (; numRows < getTableSize(); numRows++) { + if (!reader.next(key, value)) { + break; + } + Object rowData = deserializer.deserialize(value); + for (int i = 0; i < requiredFields.length; i++) { + Object fieldData = rowInspector.getStructFieldData(rowData, structFields[i]); + if (fieldData == null) { + scanData(i, null); + } else { + Object fieldValue = ((PrimitiveObjectInspector) fieldInspectors[i]).getPrimitiveJavaObject(fieldData); + scanData(i, fieldValue); + } + } + } + return numRows; + } catch (Exception e) { + close(); + e.printStackTrace(); + throw new IOException("Failed to get the next off-heap table chunk of hudi.", e); + } + } + + private InputFormat createInputFormat(Configuration conf, String inputFormat) throws Exception { + Class clazz = conf.getClassByName(inputFormat); + Class> cls = (Class>) clazz.asSubclass(InputFormat.class); + return ReflectionUtils.newInstance(cls, conf); + } + + private Deserializer getDeserializer(Configuration configuration, Properties properties, String name) throws Exception { + Class deserializerClass = Class.forName(name, true, JavaUtils.getClassLoader()) + .asSubclass(Deserializer.class); + Deserializer deserializer = deserializerClass.getConstructor().newInstance(); + deserializer.initialize(configuration, properties); + return deserializer; + } + + private StructObjectInspector getTableObjectInspector(Deserializer deserializer) throws Exception { + ObjectInspector inspector = deserializer.getObjectInspector(); + checkArgument(inspector.getCategory() == ObjectInspector.Category.STRUCT, + "expected STRUCT: %s", inspector.getCategory()); + return (StructObjectInspector) inspector; + } + + /** + * For test only + */ + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("basePath: "); + sb.append(basePath); + sb.append("\n"); + sb.append("hiveColumnNames: "); + sb.append(hiveColumnNames); + sb.append("\n"); + sb.append("hiveColumnTypes: "); + sb.append(Arrays.toString(hiveColumnTypes)); + sb.append("\n"); + sb.append("requiredFields: "); + sb.append(Arrays.toString(requiredFields)); + sb.append("\n"); + sb.append("instantTime: "); + sb.append(instantTime); + sb.append("\n"); + sb.append("deltaFilePaths: "); + sb.append(Arrays.toString(deltaFilePaths)); + sb.append("\n"); + sb.append("dataFilePath: "); + sb.append(dataFilePath); + sb.append("\n"); + sb.append("dataFileLenth: "); + sb.append(dataFileLenth); + sb.append("\n"); + sb.append("serde: "); + sb.append(serde); + sb.append("\n"); + sb.append("inputFormat: "); + sb.append(inputFormat); + sb.append("\n"); + return sb.toString(); + } +} \ No newline at end of file diff --git a/java-extensions/hudi-reader/src/main/java/com/starrocks/hudi/reader/HudiSliceScannerFactory.java b/java-extensions/hudi-reader/src/main/java/com/starrocks/hudi/reader/HudiSliceScannerFactory.java new file mode 100644 index 0000000000000..6625cf885fdbd --- /dev/null +++ b/java-extensions/hudi-reader/src/main/java/com/starrocks/hudi/reader/HudiSliceScannerFactory.java @@ -0,0 +1,45 @@ +// This file is licensed under the Elastic License 2.0. Copyright 2021-present, StarRocks Inc. + +package com.starrocks.hudi.reader; + +import com.starrocks.jni.connector.ScannerFactory; +import com.starrocks.utils.loader.ChildFirstClassLoader; + +import java.io.File; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.Arrays; +import java.util.Objects; + +public class HudiSliceScannerFactory implements ScannerFactory { + static ChildFirstClassLoader classLoader; + static { + String basePath = System.getenv("STARROCKS_HOME"); + File dir = new File(basePath + "/lib/hudi-reader-lib"); + URL[] jars = Arrays.stream(Objects.requireNonNull(dir.listFiles())) + .map(f -> { + try { + return f.toURI().toURL(); + } catch (MalformedURLException e) { + e.printStackTrace(); + throw new RuntimeException("Cannot init hudi slice classloader.", e); + } + }).toArray(URL[]::new); + classLoader = new ChildFirstClassLoader(jars, ClassLoader.getSystemClassLoader()); + } + + /** + * Hudi scanner uses own independent classloader to find all classes + * due to hadoop class conflicts with JNI launcher of libhdfs (hadoop-3.x). + */ + @Override + public Class getScannerClass() throws ClassNotFoundException { + Thread.currentThread().setContextClassLoader(classLoader); + try { + return classLoader.loadClass(HudiSliceScanner.class.getName()); + } catch (ClassNotFoundException e) { + e.printStackTrace(); + throw e; + } + } +} diff --git a/java-extensions/java-utils/src/main/java/com/starrocks/utils/loader/ChildFirstClassLoader.java b/java-extensions/java-utils/src/main/java/com/starrocks/utils/loader/ChildFirstClassLoader.java new file mode 100644 index 0000000000000..6dd3c429a2061 --- /dev/null +++ b/java-extensions/java-utils/src/main/java/com/starrocks/utils/loader/ChildFirstClassLoader.java @@ -0,0 +1,67 @@ +// This file is licensed under the Elastic License 2.0. Copyright 2021-present, StarRocks Inc. + +package com.starrocks.utils.loader; + +import com.starrocks.utils.NativeMethodHelper; + +import java.io.IOException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Enumeration; + +/** + * Reference to Apache Spark with some customization + * A mutable class loader that gives preference to its own URLs over the parent class loader + * when loading classes and resources. + */ +public class ChildFirstClassLoader extends URLClassLoader { + static { + ClassLoader.registerAsParallelCapable(); + } + + private ParentClassLoader parent; + private ArrayList parentFirstClass; + + public ChildFirstClassLoader(URL[] urls, ClassLoader parent) { + super(urls, null); + this.parent = new ParentClassLoader(parent); + // load native method class from parent + this.parentFirstClass = new ArrayList<>(Collections.singleton(NativeMethodHelper.class.getName())); + } + + @Override + public Class loadClass(String name, boolean resolve) throws ClassNotFoundException { + if (!parentFirstClass.isEmpty() && parentFirstClass.stream().anyMatch(c -> c.equals(name))) { + return parent.loadClass(name, resolve); + } + try { + return super.loadClass(name, resolve); + } catch (ClassNotFoundException cnf) { + return parent.loadClass(name, resolve); + } + } + + @Override + public Enumeration getResources(String name) throws IOException { + ArrayList urls = Collections.list(super.getResources(name)); + urls.addAll(Collections.list(parent.getResources(name))); + return Collections.enumeration(urls); + } + + @Override + public URL getResource(String name) { + URL url = super.getResource(name); + if (url != null) { + return url; + } else { + return parent.getResource(name); + } + } + + @Override + public void addURL(URL url) { + super.addURL(url); + } +} diff --git a/java-extensions/java-utils/src/main/java/com/starrocks/utils/loader/ParentClassLoader.java b/java-extensions/java-utils/src/main/java/com/starrocks/utils/loader/ParentClassLoader.java new file mode 100644 index 0000000000000..07f7f66d0a909 --- /dev/null +++ b/java-extensions/java-utils/src/main/java/com/starrocks/utils/loader/ParentClassLoader.java @@ -0,0 +1,28 @@ +// This file is licensed under the Elastic License 2.0. Copyright 2021-present, StarRocks Inc. + +package com.starrocks.utils.loader; + +/** + * Reference to Apache Spark with some customization + * A class loader which makes some protected methods in ClassLoader accessible. + */ +public class ParentClassLoader extends ClassLoader { + + static { + ClassLoader.registerAsParallelCapable(); + } + + public ParentClassLoader(ClassLoader parent) { + super(parent); + } + + @Override + public Class findClass(String name) throws ClassNotFoundException { + return super.findClass(name); + } + + @Override + public Class loadClass(String name, boolean resolve) throws ClassNotFoundException { + return super.loadClass(name, resolve); + } +} \ No newline at end of file diff --git a/java-extensions/pom.xml b/java-extensions/pom.xml index 09af606605d18..905fc4a8aa379 100644 --- a/java-extensions/pom.xml +++ b/java-extensions/pom.xml @@ -13,6 +13,7 @@ udf-extensions java-utils jni-connector + hudi-reader starrocks-java-extensions