From 69829a46688eb10efa04f0188bfe89c0deaf47b6 Mon Sep 17 00:00:00 2001 From: stephen <91597003+stephen-shelby@users.noreply.github.com> Date: Fri, 9 Aug 2024 14:34:08 +0800 Subject: [PATCH] [Feature] support iceberg partitions table (#49455) Signed-off-by: stephen --- be/src/runtime/descriptors.cpp | 3 +- .../iceberg/IcebergApiConverter.java | 21 ++ .../connector/iceberg/IcebergMetadata.java | 2 +- .../connector/metadata/MetadataTableType.java | 3 +- .../iceberg/IcebergMetadataTableFactory.java | 2 + .../iceberg/IcebergPartitionsTable.java | 104 ++++++++++ .../starrocks/sql/optimizer/Optimizer.java | 2 + .../LogicalIcebergMetadataScanOperator.java | 10 + .../sql/optimizer/rule/RuleType.java | 1 + .../IcebergPartitionsTableRewriteRule.java | 116 +++++++++++ gensrc/thrift/Types.thrift | 5 +- .../share/iceberg/IcebergPartitionUtils.java | 55 ++++++ .../iceberg-metadata-reader/pom.xml | 12 ++ .../iceberg/IcebergFilesTableScanner.java | 2 +- .../iceberg/IcebergManifestsTableScanner.java | 2 +- .../iceberg/IcebergMetadataColumnValue.java | 15 +- .../IcebergMetadataScannerFactory.java | 3 + .../IcebergPartitionsTableScanner.java | 181 ++++++++++++++++++ .../iceberg/IcebergRefsTableScanner.java | 2 +- java-extensions/pom.xml | 1 + test/sql/test_iceberg/R/test_metadata_table | 21 +- test/sql/test_iceberg/T/test_metadata_table | 19 ++ 22 files changed, 569 insertions(+), 13 deletions(-) create mode 100644 fe/fe-core/src/main/java/com/starrocks/connector/metadata/iceberg/IcebergPartitionsTable.java create mode 100644 fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/IcebergPartitionsTableRewriteRule.java create mode 100644 java-extensions/hadoop-ext/src/main/java/com/starrocks/connector/share/iceberg/IcebergPartitionUtils.java create mode 100644 java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/IcebergPartitionsTableScanner.java diff --git a/be/src/runtime/descriptors.cpp b/be/src/runtime/descriptors.cpp index 76068b74821c2f..89541e2ab849e5 100644 --- a/be/src/runtime/descriptors.cpp +++ b/be/src/runtime/descriptors.cpp @@ -741,7 +741,8 @@ Status DescriptorTbl::create(RuntimeState* state, ObjectPool* pool, const TDescr case TTableType::ICEBERG_METADATA_LOG_ENTRIES_TABLE: case TTableType::ICEBERG_SNAPSHOTS_TABLE: case TTableType::ICEBERG_MANIFESTS_TABLE: - case TTableType::ICEBERG_FILES_TABLE: { + case TTableType::ICEBERG_FILES_TABLE: + case TTableType::ICEBERG_PARTITIONS_TABLE: { desc = pool->add(new IcebergMetadataTableDescriptor(tdesc, pool)); break; } diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/iceberg/IcebergApiConverter.java b/fe/fe-core/src/main/java/com/starrocks/connector/iceberg/IcebergApiConverter.java index 7e9d70540160ae..fa0e84c099c83c 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/iceberg/IcebergApiConverter.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/iceberg/IcebergApiConverter.java @@ -399,4 +399,25 @@ private static String toPartitionField(PartitionSpec spec, PartitionField field) throw new StarRocksConnectorException("Unsupported partition transform: " + field); } + + public static List getPartitionColumns(List fields, Schema schema) { + if (fields.isEmpty()) { + return Lists.newArrayList(); + } + + List partitionColumns = Lists.newArrayList(); + for (PartitionField field : fields) { + Type srType; + org.apache.iceberg.types.Type icebergType = field.transform().getResultType(schema.findType(field.sourceId())); + try { + srType = fromIcebergType(icebergType); + } catch (InternalError | Exception e) { + LOG.error("Failed to convert iceberg type {}", icebergType, e); + throw new StarRocksConnectorException("Failed to convert iceberg type %s", icebergType); + } + StructField column = new StructField(field.name(), srType); + partitionColumns.add(column); + } + return partitionColumns; + } } \ No newline at end of file diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/iceberg/IcebergMetadata.java b/fe/fe-core/src/main/java/com/starrocks/connector/iceberg/IcebergMetadata.java index 66e09a3cfd0308..5411cac0d4bb73 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/iceberg/IcebergMetadata.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/iceberg/IcebergMetadata.java @@ -665,7 +665,7 @@ public SerializedMetaSpec getSerializedMetaSpec(String dbName, String tableName, List deleteManifests = snapshot.deleteManifests(nativeTable.io()); List matchingDeleteManifests = filterManifests(deleteManifests, nativeTable, predicate); - if (metadataTableType == MetadataTableType.FILES) { + if (metadataTableType == MetadataTableType.FILES || metadataTableType == MetadataTableType.PARTITIONS) { for (ManifestFile file : matchingDeleteManifests) { remoteMetaSplits.add(IcebergMetaSplit.from(file)); } diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/metadata/MetadataTableType.java b/fe/fe-core/src/main/java/com/starrocks/connector/metadata/MetadataTableType.java index 72b8c6631b0398..9fd60a35eaeea4 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/metadata/MetadataTableType.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/metadata/MetadataTableType.java @@ -21,7 +21,8 @@ public enum MetadataTableType { METADATA_LOG_ENTRIES("metadata_log_entries"), SNAPSHOTS("snapshots"), MANIFESTS("manifests"), - FILES("files"); + FILES("files"), + PARTITIONS("partitions"); public final String typeString; diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/metadata/iceberg/IcebergMetadataTableFactory.java b/fe/fe-core/src/main/java/com/starrocks/connector/metadata/iceberg/IcebergMetadataTableFactory.java index 2d2d0e91ee078d..023fd3624af87f 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/metadata/iceberg/IcebergMetadataTableFactory.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/metadata/iceberg/IcebergMetadataTableFactory.java @@ -42,6 +42,8 @@ public Table createTable(String catalogName, String dbName, String tableName, Me return IcebergManifestsTable.create(catalogName, dbName, tableName); case FILES: return IcebergFilesTable.create(catalogName, dbName, tableName); + case PARTITIONS: + return IcebergPartitionsTable.create(catalogName, dbName, tableName); default: LOG.error("Unrecognized iceberg metadata table type {}", tableType); throw new StarRocksConnectorException("Unrecognized iceberg metadata table type %s", tableType); diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/metadata/iceberg/IcebergPartitionsTable.java b/fe/fe-core/src/main/java/com/starrocks/connector/metadata/iceberg/IcebergPartitionsTable.java new file mode 100644 index 00000000000000..7e0d5306feae0a --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/connector/metadata/iceberg/IcebergPartitionsTable.java @@ -0,0 +1,104 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.connector.metadata.iceberg; + +import com.starrocks.analysis.DescriptorTable; +import com.starrocks.catalog.Column; +import com.starrocks.catalog.IcebergTable; +import com.starrocks.catalog.PrimitiveType; +import com.starrocks.catalog.StructField; +import com.starrocks.catalog.StructType; +import com.starrocks.catalog.Table; +import com.starrocks.connector.ConnectorTableId; +import com.starrocks.connector.exception.StarRocksConnectorException; +import com.starrocks.connector.iceberg.IcebergApiConverter; +import com.starrocks.connector.metadata.MetadataTable; +import com.starrocks.connector.metadata.MetadataTableType; +import com.starrocks.connector.share.iceberg.IcebergPartitionUtils; +import com.starrocks.server.GlobalStateMgr; +import com.starrocks.thrift.THdfsTable; +import com.starrocks.thrift.TTableDescriptor; +import com.starrocks.thrift.TTableType; +import org.apache.iceberg.PartitionField; + +import java.util.List; + +import static com.starrocks.catalog.ScalarType.createType; +import static com.starrocks.connector.metadata.TableMetaMetadata.METADATA_DB_NAME; + +public class IcebergPartitionsTable extends MetadataTable { + public static final String TABLE_NAME = "iceberg_partitions_table"; + + public IcebergPartitionsTable(String catalogName, long id, String name, Table.TableType type, + List baseSchema, String originDb, String originTable, + MetadataTableType metadataTableType) { + super(catalogName, id, name, type, baseSchema, originDb, originTable, metadataTableType); + } + + public static IcebergPartitionsTable create(String catalogName, String originDb, String originTable) { + Table icebergTable = GlobalStateMgr.getCurrentState().getMetadataMgr().getTable(catalogName, originDb, originTable); + if (icebergTable == null) { + throw new StarRocksConnectorException("table [%s.%s.%s] does not exist", catalogName, originDb, originTable); + } + if (!icebergTable.isIcebergTable()) { + throw new StarRocksConnectorException("table [%s.%s.%s] isn't an iceberg table", catalogName, originDb, originTable); + } + org.apache.iceberg.Table table = ((IcebergTable) icebergTable).getNativeTable(); + Builder builder = builder(); + if (table.spec().isPartitioned()) { + List partitionFields = IcebergPartitionUtils.getAllPartitionFields(table); + List partitionStructFields = IcebergApiConverter.getPartitionColumns(partitionFields, table.schema()); + StructType partitionType = new StructType(partitionStructFields, true); + builder.column("partition_value", partitionType); + builder.column("spec_id", createType(PrimitiveType.INT)); + } + + return new IcebergPartitionsTable(catalogName, + ConnectorTableId.CONNECTOR_ID_GENERATOR.getNextId().asInt(), + TABLE_NAME, + Table.TableType.METADATA, + builder.column("record_count", createType(PrimitiveType.BIGINT)) + .column("file_count", createType(PrimitiveType.BIGINT)) + .column("total_data_file_size_in_bytes", createType(PrimitiveType.BIGINT)) + .column("position_delete_record_count", createType(PrimitiveType.BIGINT)) + .column("position_delete_file_count", createType(PrimitiveType.BIGINT)) + .column("equality_delete_record_count", createType(PrimitiveType.BIGINT)) + .column("equality_delete_file_count", createType(PrimitiveType.BIGINT)) + .column("last_updated_at", createType(PrimitiveType.DATETIME)) + .build(), + originDb, + originTable, + MetadataTableType.PARTITIONS); + } + + @Override + public TTableDescriptor toThrift(List partitions) { + TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.ICEBERG_PARTITIONS_TABLE, + fullSchema.size(), 0, getName(), METADATA_DB_NAME); + THdfsTable hdfsTable = buildThriftTable(fullSchema); + tTableDescriptor.setHdfsTable(hdfsTable); + return tTableDescriptor; + } + + @Override + public boolean isPartitioned() { + return getColumn("partition") != null; + } + + @Override + public boolean supportBuildPlan() { + return true; + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/Optimizer.java b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/Optimizer.java index c6cba052bde97d..6343f4e725c819 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/Optimizer.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/Optimizer.java @@ -49,6 +49,7 @@ import com.starrocks.sql.optimizer.rule.transformation.EliminateAggRule; import com.starrocks.sql.optimizer.rule.transformation.ForceCTEReuseRule; import com.starrocks.sql.optimizer.rule.transformation.GroupByCountDistinctRewriteRule; +import com.starrocks.sql.optimizer.rule.transformation.IcebergPartitionsTableRewriteRule; import com.starrocks.sql.optimizer.rule.transformation.JoinLeftAsscomRule; import com.starrocks.sql.optimizer.rule.transformation.MaterializedViewTransparentRewriteRule; import com.starrocks.sql.optimizer.rule.transformation.MergeProjectWithChildRule; @@ -452,6 +453,7 @@ private OptExpression logicalRuleRewrite( CTEUtils.collectCteOperators(tree, context); } + ruleRewriteOnlyOnce(tree, rootTaskContext, new IcebergPartitionsTableRewriteRule()); ruleRewriteIterative(tree, rootTaskContext, RuleSetType.AGGREGATE_REWRITE); ruleRewriteIterative(tree, rootTaskContext, RuleSetType.PUSH_DOWN_SUBQUERY); ruleRewriteIterative(tree, rootTaskContext, RuleSetType.SUBQUERY_REWRITE_COMMON); diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/operator/logical/LogicalIcebergMetadataScanOperator.java b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/operator/logical/LogicalIcebergMetadataScanOperator.java index 93c79909f0bb5a..868c5be81e8fb5 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/operator/logical/LogicalIcebergMetadataScanOperator.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/operator/logical/LogicalIcebergMetadataScanOperator.java @@ -27,6 +27,7 @@ public class LogicalIcebergMetadataScanOperator extends LogicalScanOperator { private ScanOperatorPredicates predicates = new ScanOperatorPredicates(); + private boolean isTransformed; public LogicalIcebergMetadataScanOperator(Table table, Map colRefToColumnMetaMap, @@ -65,6 +66,14 @@ public void setScanOperatorPredicates(ScanOperatorPredicates predicates) { this.predicates = predicates; } + public boolean isTransformed() { + return isTransformed; + } + + public void setTransformed(boolean transformed) { + isTransformed = transformed; + } + @Override public R accept(OperatorVisitor visitor, C context) { return visitor.visitLogicalIcebergMetadataScan(this, context); @@ -82,6 +91,7 @@ protected LogicalIcebergMetadataScanOperator newInstance() { public LogicalIcebergMetadataScanOperator.Builder withOperator(LogicalIcebergMetadataScanOperator scanOperator) { super.withOperator(scanOperator); builder.predicates = scanOperator.predicates; + builder.isTransformed = scanOperator.isTransformed; return this; } } diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/RuleType.java b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/RuleType.java index 83c76e66012e23..fef569062c8369 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/RuleType.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/RuleType.java @@ -143,6 +143,7 @@ public enum RuleType { TF_REWRITE_PARTITION_COLUMN_ONLY_AGG, TF_REWRITE_SUM_BY_ASSOCIATIVE_RULE, TF_REWRITE_COUNT_IF_RULE, + TF_ICEBERG_PARTITIONS_TABLE_REWRITE_RULE, TF_INTERSECT_REORDER, TF_INTERSECT_DISTINCT, diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/IcebergPartitionsTableRewriteRule.java b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/IcebergPartitionsTableRewriteRule.java new file mode 100644 index 00000000000000..997ad5754bdd58 --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/IcebergPartitionsTableRewriteRule.java @@ -0,0 +1,116 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.sql.optimizer.rule.transformation; + +import com.google.common.collect.Lists; +import com.starrocks.analysis.Expr; +import com.starrocks.catalog.Column; +import com.starrocks.catalog.Function; +import com.starrocks.catalog.Type; +import com.starrocks.connector.exception.StarRocksConnectorException; +import com.starrocks.connector.metadata.MetadataTable; +import com.starrocks.connector.metadata.MetadataTableType; +import com.starrocks.sql.optimizer.OptExpression; +import com.starrocks.sql.optimizer.OptimizerContext; +import com.starrocks.sql.optimizer.operator.AggType; +import com.starrocks.sql.optimizer.operator.Operator; +import com.starrocks.sql.optimizer.operator.OperatorType; +import com.starrocks.sql.optimizer.operator.logical.LogicalAggregationOperator; +import com.starrocks.sql.optimizer.operator.logical.LogicalIcebergMetadataScanOperator; +import com.starrocks.sql.optimizer.operator.pattern.Pattern; +import com.starrocks.sql.optimizer.operator.scalar.CallOperator; +import com.starrocks.sql.optimizer.operator.scalar.ColumnRefOperator; +import com.starrocks.sql.optimizer.rule.RuleType; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class IcebergPartitionsTableRewriteRule extends TransformationRule { + public IcebergPartitionsTableRewriteRule() { + super(RuleType.TF_ICEBERG_PARTITIONS_TABLE_REWRITE_RULE, Pattern.create(OperatorType.LOGICAL_ICEBERG_METADATA_SCAN)); + } + + @Override + public boolean check(final OptExpression input, OptimizerContext context) { + Operator op = input.getOp(); + if (!(op instanceof LogicalIcebergMetadataScanOperator)) { + return false; + } + LogicalIcebergMetadataScanOperator scanOperator = op.cast(); + if (scanOperator.isTransformed()) { + return false; + } + MetadataTable metadataTable = (MetadataTable) scanOperator.getTable(); + return metadataTable.getMetadataTableType() == MetadataTableType.PARTITIONS; + } + + @Override + public List transform(OptExpression input, OptimizerContext context) { + LogicalIcebergMetadataScanOperator operator = input.getOp().cast(); + List groupingKeys = new ArrayList<>(); + Map aggregations = new HashMap<>(); + for (Map.Entry entries : operator.getColRefToColumnMetaMap().entrySet()) { + ColumnRefOperator columnRefOperator = entries.getKey(); + Column column = entries.getValue(); + CallOperator agg = null; + Function fun; + Type[] argTypes = new Type[] {column.getType()}; + String columnName = columnRefOperator.getName(); + switch (columnName) { + case "partition_value": + groupingKeys.add(columnRefOperator); + break; + case "spec_id": + fun = Expr.getBuiltinFunction("any_value", argTypes, Function.CompareMode.IS_IDENTICAL); + agg = new CallOperator("any_value", Type.INT, Lists.newArrayList(columnRefOperator), fun); + break; + case "record_count": + case "total_data_file_size_in_bytes": + case "position_delete_record_count": + case "equality_delete_record_count": + case "file_count": + case "position_delete_file_count": + case "equality_delete_file_count": + fun = Expr.getBuiltinFunction("sum", argTypes, Function.CompareMode.IS_IDENTICAL); + agg = new CallOperator("sum", Type.BIGINT, Lists.newArrayList(columnRefOperator), fun); + break; + case "last_updated_at": + fun = Expr.getBuiltinFunction("max", argTypes, Function.CompareMode.IS_IDENTICAL); + agg = new CallOperator("max", Type.DATETIME, Lists.newArrayList(columnRefOperator), fun); + break; + default: + throw new StarRocksConnectorException("Unknown column name %s when rewriting " + + "iceberg partitions table", columnName); + } + + if (agg != null) { + aggregations.put(columnRefOperator, agg); + } + } + + LogicalAggregationOperator agg = new LogicalAggregationOperator(AggType.GLOBAL, groupingKeys, aggregations); + LogicalIcebergMetadataScanOperator newScanOp = new LogicalIcebergMetadataScanOperator( + operator.getTable(), + operator.getColRefToColumnMetaMap(), + operator.getColumnMetaToColRefMap(), + operator.getLimit(), + operator.getPredicate()); + newScanOp.setTransformed(true); + return Collections.singletonList(OptExpression.create(agg, OptExpression.create(newScanOp))); + } +} diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index b6eea653a8dac3..35ce762776a747 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -413,7 +413,8 @@ enum TTableType { ICEBERG_METADATA_LOG_ENTRIES_TABLE, ICEBERG_SNAPSHOTS_TABLE, ICEBERG_MANIFESTS_TABLE, - ICEBERG_FILES_TABLE + ICEBERG_FILES_TABLE, + ICEBERG_PARTITIONS_TABLE } enum TKeysType { @@ -591,4 +592,4 @@ struct TSnapshotInfo { enum TTxnType { TXN_NORMAL = 0, TXN_REPLICATION = 1 -} \ No newline at end of file +} diff --git a/java-extensions/hadoop-ext/src/main/java/com/starrocks/connector/share/iceberg/IcebergPartitionUtils.java b/java-extensions/hadoop-ext/src/main/java/com/starrocks/connector/share/iceberg/IcebergPartitionUtils.java new file mode 100644 index 00000000000000..d8dc00a99471b5 --- /dev/null +++ b/java-extensions/hadoop-ext/src/main/java/com/starrocks/connector/share/iceberg/IcebergPartitionUtils.java @@ -0,0 +1,55 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.connector.share.iceberg; + +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.Table; +import org.apache.iceberg.types.Types; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + + +public class IcebergPartitionUtils { + public static List getAllPartitionFields(Table icebergTable) { + Set existingColumnsIds = icebergTable.schema() + .columns().stream() + .map(Types.NestedField::fieldId) + .collect(Collectors.toSet()); + + List visiblePartitionFields = icebergTable.specs() + .values().stream() + .flatMap(partitionSpec -> partitionSpec.fields().stream()) + .filter(partitionField -> existingColumnsIds.contains(partitionField.sourceId())) + .collect(Collectors.toList()); + + return filterDuplicates(visiblePartitionFields); + } + + public static List filterDuplicates(List visiblePartitionFields) { + Set existingFieldIds = new HashSet<>(); + List result = new ArrayList<>(); + for (PartitionField partitionField : visiblePartitionFields) { + if (!existingFieldIds.contains(partitionField.fieldId())) { + existingFieldIds.add(partitionField.fieldId()); + result.add(partitionField); + } + } + return result; + } +} diff --git a/java-extensions/iceberg-metadata-reader/pom.xml b/java-extensions/iceberg-metadata-reader/pom.xml index 63e2520d08c088..51882cd1f0c84f 100644 --- a/java-extensions/iceberg-metadata-reader/pom.xml +++ b/java-extensions/iceberg-metadata-reader/pom.xml @@ -90,6 +90,18 @@ org.apache.iceberg iceberg-core ${iceberg.version} + + + org.apache.avro + avro + + + + + + org.apache.avro + avro + ${avro.version} diff --git a/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/IcebergFilesTableScanner.java b/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/IcebergFilesTableScanner.java index 33e3a56ff67833..67cf09a59a0db8 100644 --- a/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/IcebergFilesTableScanner.java +++ b/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/IcebergFilesTableScanner.java @@ -73,7 +73,7 @@ public int doGetNext() { if (fieldData == null) { appendData(i, null); } else { - ColumnValue fieldValue = new IcebergMetadataColumnValue(fieldData); + ColumnValue fieldValue = new IcebergMetadataColumnValue(fieldData, timezone); appendData(i, fieldValue); } } diff --git a/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/IcebergManifestsTableScanner.java b/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/IcebergManifestsTableScanner.java index 416cf92aa75f0d..75b9630c25435a 100644 --- a/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/IcebergManifestsTableScanner.java +++ b/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/IcebergManifestsTableScanner.java @@ -55,7 +55,7 @@ public int doGetNext() { if (fieldData == null) { appendData(i, null); } else { - ColumnValue fieldValue = new IcebergMetadataColumnValue(fieldData); + ColumnValue fieldValue = new IcebergMetadataColumnValue(fieldData, timezone); appendData(i, fieldValue); } } diff --git a/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/IcebergMetadataColumnValue.java b/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/IcebergMetadataColumnValue.java index b59a2e69a4a082..625cbdcfc5f641 100644 --- a/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/IcebergMetadataColumnValue.java +++ b/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/IcebergMetadataColumnValue.java @@ -23,6 +23,7 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.time.ZoneId; +import java.time.ZoneOffset; import java.util.List; import java.util.Map; @@ -88,7 +89,7 @@ public void unpackArray(List values) { for (Object item : items) { IcebergMetadataColumnValue cv = null; if (item != null) { - cv = new IcebergMetadataColumnValue(item); + cv = new IcebergMetadataColumnValue(item, timezone); } values.add(cv); } @@ -106,9 +107,12 @@ public void unpackMap(List keys, List values) { @Override public void unpackStruct(List structFieldIndex, List values) { GenericRecord record = (GenericRecord) fieldData; - for (int i = 0; i < structFieldIndex.size(); i++) { - Integer idx = structFieldIndex.get(i); - IcebergMetadataColumnValue value = new IcebergMetadataColumnValue(record.get(idx)); + for (Integer fieldIndex : structFieldIndex) { + IcebergMetadataColumnValue value = null; + Object rawValue = record.get(fieldIndex); + if (rawValue != null) { + value = new IcebergMetadataColumnValue(record.get(fieldIndex), timezone); + } values.add(value); } } @@ -125,6 +129,9 @@ public BigDecimal getDecimal() { @Override public LocalDate getDate() { + if (fieldData instanceof Integer) { + return Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC).plusDays((int) fieldData).toLocalDate(); + } return null; } diff --git a/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/IcebergMetadataScannerFactory.java b/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/IcebergMetadataScannerFactory.java index 3384c5e4a5330a..84f2085ca36724 100644 --- a/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/IcebergMetadataScannerFactory.java +++ b/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/IcebergMetadataScannerFactory.java @@ -61,6 +61,9 @@ public Class getScannerClass(String scannerType) throws ClassNotFoundException { case "files": loadClass = "com.starrocks.connector.iceberg.IcebergFilesTableScanner"; break; + case "partitions": + loadClass = "com.starrocks.connector.iceberg.IcebergPartitionsTableScanner"; + break; default: throw new IllegalArgumentException("unknown iceberg scanner type " + scannerType); } diff --git a/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/IcebergPartitionsTableScanner.java b/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/IcebergPartitionsTableScanner.java new file mode 100644 index 00000000000000..097fa4c25e1e3e --- /dev/null +++ b/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/IcebergPartitionsTableScanner.java @@ -0,0 +1,181 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.connector.iceberg; + +import com.google.common.collect.ImmutableList; +import com.starrocks.connector.share.iceberg.IcebergPartitionUtils; +import com.starrocks.jni.connector.ColumnValue; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.ManifestContent; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.PartitionData; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.iceberg.util.SerializationUtil.deserializeFromBase64; + +public class IcebergPartitionsTableScanner extends AbstractIcebergMetadataScanner { + protected static final List SCAN_COLUMNS = + ImmutableList.of("content", "partition", "file_size_in_bytes", "record_count"); + + private final String manifestBean; + private ManifestFile manifestFile; + private CloseableIterator> reader; + private List partitionFields; + private Long lastUpdateTime; + private Integer spedId; + private Schema schema; + private GenericRecord reusedRecord; + + public IcebergPartitionsTableScanner(int fetchSize, Map params) { + super(fetchSize, params); + this.manifestBean = params.get("split_info"); + } + + @Override + public void doOpen() { + this.manifestFile = deserializeFromBase64(manifestBean); + this.schema = table.schema(); + this.spedId = manifestFile.partitionSpecId(); + this.lastUpdateTime = table.snapshot(manifestFile.snapshotId()) != null ? + table.snapshot(manifestFile.snapshotId()).timestampMillis() : null; + this.partitionFields = IcebergPartitionUtils.getAllPartitionFields(table); + this.reusedRecord = GenericRecord.create(getResultType()); + } + + @Override + public int doGetNext() { + int numRows = 0; + for (; numRows < getTableSize(); numRows++) { + if (!reader.hasNext()) { + break; + } + ContentFile file = reader.next(); + for (int i = 0; i < requiredFields.length; i++) { + Object fieldData = get(requiredFields[i], file); + if (fieldData == null) { + appendData(i, null); + } else { + ColumnValue fieldValue = new IcebergMetadataColumnValue(fieldData, timezone); + appendData(i, fieldValue); + } + } + } + return numRows; + } + + @Override + public void doClose() throws IOException { + if (reader != null) { + reader.close(); + } + reusedRecord = null; + + } + + @Override + protected void initReader() { + Map specs = table.specs(); + if (manifestFile.content() == ManifestContent.DATA) { + reader = ManifestFiles.read(manifestFile, table.io(), specs) + .select(SCAN_COLUMNS) + .caseSensitive(false) + .iterator(); + } else { + reader = ManifestFiles.readDeleteManifest(manifestFile, table.io(), specs) + .select(SCAN_COLUMNS) + .caseSensitive(false) + .iterator(); + } + } + + private Types.StructType getResultType() { + List fields = new ArrayList<>(); + for (PartitionField partitionField : partitionFields) { + int id = partitionField.fieldId(); + String name = partitionField.name(); + Type type = partitionField.transform().getResultType(schema.findType(partitionField.sourceId())); + Types.NestedField nestedField = Types.NestedField.optional(id, name, type); + fields.add(nestedField); + } + return Types.StructType.of(fields); + } + + private Object get(String columnName, ContentFile file) { + FileContent content = file.content(); + switch (columnName) { + case "partition_value": + return getPartitionValues((PartitionData) file.partition()); + case "spec_id": + return spedId; + case "record_count": + return content == FileContent.DATA ? file.recordCount() : 0; + case "file_count": + return content == FileContent.DATA ? 1L : 0L; + case "total_data_file_size_in_bytes": + return content == FileContent.DATA ? file.fileSizeInBytes() : 0; + case "position_delete_record_count": + return content == FileContent.POSITION_DELETES ? file.recordCount() : 0; + case "position_delete_file_count": + return content == FileContent.POSITION_DELETES ? 1L : 0L; + case "equality_delete_record_count": + return content == FileContent.EQUALITY_DELETES ? file.recordCount() : 0; + case "equality_delete_file_count": + return content == FileContent.EQUALITY_DELETES ? 1L : 0L; + case "last_updated_at": + return lastUpdateTime; + default: + throw new IllegalArgumentException("Unrecognized column name " + columnName); + } + } + + private Object getPartitionValues(PartitionData partitionData) { + List fileFields = partitionData.getPartitionType().fields(); + Map fieldIdToPos = new HashMap<>(); + for (int i = 0; i < fileFields.size(); i++) { + fieldIdToPos.put(fileFields.get(i).fieldId(), i); + } + + for (PartitionField partitionField : partitionFields) { + Integer fieldId = partitionField.fieldId(); + String name = partitionField.name(); + if (fieldIdToPos.containsKey(fieldId)) { + int pos = fieldIdToPos.get(fieldId); + Type fieldType = partitionData.getType(pos); + Object partitionValue = partitionData.get(pos); + if (partitionField.transform().isIdentity() && Types.TimestampType.withZone().equals(fieldType)) { + partitionValue = ((long) partitionValue) / 1000; + } + reusedRecord.setField(name, partitionValue); + } else { + reusedRecord.setField(name, null); + } + } + return reusedRecord; + } +} diff --git a/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/IcebergRefsTableScanner.java b/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/IcebergRefsTableScanner.java index a9bd5656679f00..89570279b95792 100644 --- a/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/IcebergRefsTableScanner.java +++ b/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/IcebergRefsTableScanner.java @@ -47,7 +47,7 @@ public int doGetNext() { if (fieldData == null) { appendData(i, null); } else { - ColumnValue fieldValue = new IcebergMetadataColumnValue(fieldData); + ColumnValue fieldValue = new IcebergMetadataColumnValue(fieldData, timezone); appendData(i, fieldValue); } } diff --git a/java-extensions/pom.xml b/java-extensions/pom.xml index 90a6f339190592..d76c4ba267582a 100644 --- a/java-extensions/pom.xml +++ b/java-extensions/pom.xml @@ -42,6 +42,7 @@ 1.0.0 1.0.0 1.0.0 + 1.11.3 diff --git a/test/sql/test_iceberg/R/test_metadata_table b/test/sql/test_iceberg/R/test_metadata_table index f3f2fcff2c1d63..77494fa84b44d3 100644 --- a/test/sql/test_iceberg/R/test_metadata_table +++ b/test/sql/test_iceberg/R/test_metadata_table @@ -77,6 +77,25 @@ append append append -- !result +create table iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0}_${uuid0} (k1 int, p1 int, p2 int) partition by (p1, p2); +-- result: +-- !result +insert into iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0}_${uuid0} select 1,1,2; +-- result: +-- !result +insert into iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0}_${uuid0} select 3,3,4; +-- result: +-- !result +insert into iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0}_${uuid0} select 3,3,4; +-- result: +-- !result +select record_count,file_count from iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0}_${uuid0}$partitions where partition_value.p2=4; +-- result: +2 2 +-- !result +drop table iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0}_${uuid0} force; +-- result: +-- !result drop table iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0} force; -- result: -- !result @@ -85,4 +104,4 @@ drop database iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}; -- !result drop catalog iceberg_sql_test_${uuid0}; -- result: --- !result +-- !result \ No newline at end of file diff --git a/test/sql/test_iceberg/T/test_metadata_table b/test/sql/test_iceberg/T/test_metadata_table index 2ca0fc4a517633..1c5d806b9f257c 100644 --- a/test/sql/test_iceberg/T/test_metadata_table +++ b/test/sql/test_iceberg/T/test_metadata_table @@ -17,12 +17,31 @@ select name, type from iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${u select name, type from iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0}$refs where name="test_branch_2" order by name; select name, type from iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0}$refs where type="TAG" order by name; +-- iceberg history table select is_current_ancestor from iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0}$history where parent_id is not null order by is_current_ancestor; + +-- iceberg metadata log entries table select latest_schema_id from iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0}$metadata_log_entries where latest_schema_id is not null order by latest_schema_id; select latest_schema_id from iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0}$metadata_log_entries where latest_schema_id is not null limit 1; + +-- iceberg manifests table select added_data_files_count,added_rows_count,existing_data_files_count,partitions from iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0}$manifests order by added_data_files_count; + +-- iceberg files table select record_count,lower_bounds from iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0}$files order by element_at(lower_bounds, 1); + +-- iceberg snapshots table select operation from iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0}$snapshots order by operation; + +-- iceberg partitions table +create table iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0}_${uuid0} (k1 int, p1 int, p2 int) partition by (p1, p2); +insert into iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0}_${uuid0} select 1,1,2; +insert into iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0}_${uuid0} select 3,3,4; +insert into iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0}_${uuid0} select 3,3,4; + +select record_count,file_count from iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0}_${uuid0}$partitions where partition_value.p2=4; + +drop table iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0}_${uuid0} force; drop table iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0} force; drop database iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}; drop catalog iceberg_sql_test_${uuid0};