Skip to content

Commit

Permalink
[Feature] support iceberg partitions table (StarRocks#49455)
Browse files Browse the repository at this point in the history
Signed-off-by: stephen <stephen5217@163.com>
  • Loading branch information
stephen-shelby authored Aug 9, 2024
1 parent c8bd900 commit 69829a4
Show file tree
Hide file tree
Showing 22 changed files with 569 additions and 13 deletions.
3 changes: 2 additions & 1 deletion be/src/runtime/descriptors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,8 @@ Status DescriptorTbl::create(RuntimeState* state, ObjectPool* pool, const TDescr
case TTableType::ICEBERG_METADATA_LOG_ENTRIES_TABLE:
case TTableType::ICEBERG_SNAPSHOTS_TABLE:
case TTableType::ICEBERG_MANIFESTS_TABLE:
case TTableType::ICEBERG_FILES_TABLE: {
case TTableType::ICEBERG_FILES_TABLE:
case TTableType::ICEBERG_PARTITIONS_TABLE: {
desc = pool->add(new IcebergMetadataTableDescriptor(tdesc, pool));
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,4 +399,25 @@ private static String toPartitionField(PartitionSpec spec, PartitionField field)

throw new StarRocksConnectorException("Unsupported partition transform: " + field);
}

public static List<StructField> getPartitionColumns(List<PartitionField> fields, Schema schema) {
if (fields.isEmpty()) {
return Lists.newArrayList();
}

List<StructField> partitionColumns = Lists.newArrayList();
for (PartitionField field : fields) {
Type srType;
org.apache.iceberg.types.Type icebergType = field.transform().getResultType(schema.findType(field.sourceId()));
try {
srType = fromIcebergType(icebergType);
} catch (InternalError | Exception e) {
LOG.error("Failed to convert iceberg type {}", icebergType, e);
throw new StarRocksConnectorException("Failed to convert iceberg type %s", icebergType);
}
StructField column = new StructField(field.name(), srType);
partitionColumns.add(column);
}
return partitionColumns;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,7 @@ public SerializedMetaSpec getSerializedMetaSpec(String dbName, String tableName,

List<ManifestFile> deleteManifests = snapshot.deleteManifests(nativeTable.io());
List<ManifestFile> matchingDeleteManifests = filterManifests(deleteManifests, nativeTable, predicate);
if (metadataTableType == MetadataTableType.FILES) {
if (metadataTableType == MetadataTableType.FILES || metadataTableType == MetadataTableType.PARTITIONS) {
for (ManifestFile file : matchingDeleteManifests) {
remoteMetaSplits.add(IcebergMetaSplit.from(file));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ public enum MetadataTableType {
METADATA_LOG_ENTRIES("metadata_log_entries"),
SNAPSHOTS("snapshots"),
MANIFESTS("manifests"),
FILES("files");
FILES("files"),
PARTITIONS("partitions");

public final String typeString;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public Table createTable(String catalogName, String dbName, String tableName, Me
return IcebergManifestsTable.create(catalogName, dbName, tableName);
case FILES:
return IcebergFilesTable.create(catalogName, dbName, tableName);
case PARTITIONS:
return IcebergPartitionsTable.create(catalogName, dbName, tableName);
default:
LOG.error("Unrecognized iceberg metadata table type {}", tableType);
throw new StarRocksConnectorException("Unrecognized iceberg metadata table type %s", tableType);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.connector.metadata.iceberg;

import com.starrocks.analysis.DescriptorTable;
import com.starrocks.catalog.Column;
import com.starrocks.catalog.IcebergTable;
import com.starrocks.catalog.PrimitiveType;
import com.starrocks.catalog.StructField;
import com.starrocks.catalog.StructType;
import com.starrocks.catalog.Table;
import com.starrocks.connector.ConnectorTableId;
import com.starrocks.connector.exception.StarRocksConnectorException;
import com.starrocks.connector.iceberg.IcebergApiConverter;
import com.starrocks.connector.metadata.MetadataTable;
import com.starrocks.connector.metadata.MetadataTableType;
import com.starrocks.connector.share.iceberg.IcebergPartitionUtils;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.thrift.THdfsTable;
import com.starrocks.thrift.TTableDescriptor;
import com.starrocks.thrift.TTableType;
import org.apache.iceberg.PartitionField;

import java.util.List;

import static com.starrocks.catalog.ScalarType.createType;
import static com.starrocks.connector.metadata.TableMetaMetadata.METADATA_DB_NAME;

public class IcebergPartitionsTable extends MetadataTable {
public static final String TABLE_NAME = "iceberg_partitions_table";

public IcebergPartitionsTable(String catalogName, long id, String name, Table.TableType type,
List<Column> baseSchema, String originDb, String originTable,
MetadataTableType metadataTableType) {
super(catalogName, id, name, type, baseSchema, originDb, originTable, metadataTableType);
}

public static IcebergPartitionsTable create(String catalogName, String originDb, String originTable) {
Table icebergTable = GlobalStateMgr.getCurrentState().getMetadataMgr().getTable(catalogName, originDb, originTable);
if (icebergTable == null) {
throw new StarRocksConnectorException("table [%s.%s.%s] does not exist", catalogName, originDb, originTable);
}
if (!icebergTable.isIcebergTable()) {
throw new StarRocksConnectorException("table [%s.%s.%s] isn't an iceberg table", catalogName, originDb, originTable);
}
org.apache.iceberg.Table table = ((IcebergTable) icebergTable).getNativeTable();
Builder builder = builder();
if (table.spec().isPartitioned()) {
List<PartitionField> partitionFields = IcebergPartitionUtils.getAllPartitionFields(table);
List<StructField> partitionStructFields = IcebergApiConverter.getPartitionColumns(partitionFields, table.schema());
StructType partitionType = new StructType(partitionStructFields, true);
builder.column("partition_value", partitionType);
builder.column("spec_id", createType(PrimitiveType.INT));
}

return new IcebergPartitionsTable(catalogName,
ConnectorTableId.CONNECTOR_ID_GENERATOR.getNextId().asInt(),
TABLE_NAME,
Table.TableType.METADATA,
builder.column("record_count", createType(PrimitiveType.BIGINT))
.column("file_count", createType(PrimitiveType.BIGINT))
.column("total_data_file_size_in_bytes", createType(PrimitiveType.BIGINT))
.column("position_delete_record_count", createType(PrimitiveType.BIGINT))
.column("position_delete_file_count", createType(PrimitiveType.BIGINT))
.column("equality_delete_record_count", createType(PrimitiveType.BIGINT))
.column("equality_delete_file_count", createType(PrimitiveType.BIGINT))
.column("last_updated_at", createType(PrimitiveType.DATETIME))
.build(),
originDb,
originTable,
MetadataTableType.PARTITIONS);
}

@Override
public TTableDescriptor toThrift(List<DescriptorTable.ReferencedPartitionInfo> partitions) {
TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.ICEBERG_PARTITIONS_TABLE,
fullSchema.size(), 0, getName(), METADATA_DB_NAME);
THdfsTable hdfsTable = buildThriftTable(fullSchema);
tTableDescriptor.setHdfsTable(hdfsTable);
return tTableDescriptor;
}

@Override
public boolean isPartitioned() {
return getColumn("partition") != null;
}

@Override
public boolean supportBuildPlan() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import com.starrocks.sql.optimizer.rule.transformation.EliminateAggRule;
import com.starrocks.sql.optimizer.rule.transformation.ForceCTEReuseRule;
import com.starrocks.sql.optimizer.rule.transformation.GroupByCountDistinctRewriteRule;
import com.starrocks.sql.optimizer.rule.transformation.IcebergPartitionsTableRewriteRule;
import com.starrocks.sql.optimizer.rule.transformation.JoinLeftAsscomRule;
import com.starrocks.sql.optimizer.rule.transformation.MaterializedViewTransparentRewriteRule;
import com.starrocks.sql.optimizer.rule.transformation.MergeProjectWithChildRule;
Expand Down Expand Up @@ -452,6 +453,7 @@ private OptExpression logicalRuleRewrite(
CTEUtils.collectCteOperators(tree, context);
}

ruleRewriteOnlyOnce(tree, rootTaskContext, new IcebergPartitionsTableRewriteRule());
ruleRewriteIterative(tree, rootTaskContext, RuleSetType.AGGREGATE_REWRITE);
ruleRewriteIterative(tree, rootTaskContext, RuleSetType.PUSH_DOWN_SUBQUERY);
ruleRewriteIterative(tree, rootTaskContext, RuleSetType.SUBQUERY_REWRITE_COMMON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

public class LogicalIcebergMetadataScanOperator extends LogicalScanOperator {
private ScanOperatorPredicates predicates = new ScanOperatorPredicates();
private boolean isTransformed;

public LogicalIcebergMetadataScanOperator(Table table,
Map<ColumnRefOperator, Column> colRefToColumnMetaMap,
Expand Down Expand Up @@ -65,6 +66,14 @@ public void setScanOperatorPredicates(ScanOperatorPredicates predicates) {
this.predicates = predicates;
}

public boolean isTransformed() {
return isTransformed;
}

public void setTransformed(boolean transformed) {
isTransformed = transformed;
}

@Override
public <R, C> R accept(OperatorVisitor<R, C> visitor, C context) {
return visitor.visitLogicalIcebergMetadataScan(this, context);
Expand All @@ -82,6 +91,7 @@ protected LogicalIcebergMetadataScanOperator newInstance() {
public LogicalIcebergMetadataScanOperator.Builder withOperator(LogicalIcebergMetadataScanOperator scanOperator) {
super.withOperator(scanOperator);
builder.predicates = scanOperator.predicates;
builder.isTransformed = scanOperator.isTransformed;
return this;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ public enum RuleType {
TF_REWRITE_PARTITION_COLUMN_ONLY_AGG,
TF_REWRITE_SUM_BY_ASSOCIATIVE_RULE,
TF_REWRITE_COUNT_IF_RULE,
TF_ICEBERG_PARTITIONS_TABLE_REWRITE_RULE,

TF_INTERSECT_REORDER,
TF_INTERSECT_DISTINCT,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.sql.optimizer.rule.transformation;

import com.google.common.collect.Lists;
import com.starrocks.analysis.Expr;
import com.starrocks.catalog.Column;
import com.starrocks.catalog.Function;
import com.starrocks.catalog.Type;
import com.starrocks.connector.exception.StarRocksConnectorException;
import com.starrocks.connector.metadata.MetadataTable;
import com.starrocks.connector.metadata.MetadataTableType;
import com.starrocks.sql.optimizer.OptExpression;
import com.starrocks.sql.optimizer.OptimizerContext;
import com.starrocks.sql.optimizer.operator.AggType;
import com.starrocks.sql.optimizer.operator.Operator;
import com.starrocks.sql.optimizer.operator.OperatorType;
import com.starrocks.sql.optimizer.operator.logical.LogicalAggregationOperator;
import com.starrocks.sql.optimizer.operator.logical.LogicalIcebergMetadataScanOperator;
import com.starrocks.sql.optimizer.operator.pattern.Pattern;
import com.starrocks.sql.optimizer.operator.scalar.CallOperator;
import com.starrocks.sql.optimizer.operator.scalar.ColumnRefOperator;
import com.starrocks.sql.optimizer.rule.RuleType;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class IcebergPartitionsTableRewriteRule extends TransformationRule {
public IcebergPartitionsTableRewriteRule() {
super(RuleType.TF_ICEBERG_PARTITIONS_TABLE_REWRITE_RULE, Pattern.create(OperatorType.LOGICAL_ICEBERG_METADATA_SCAN));
}

@Override
public boolean check(final OptExpression input, OptimizerContext context) {
Operator op = input.getOp();
if (!(op instanceof LogicalIcebergMetadataScanOperator)) {
return false;
}
LogicalIcebergMetadataScanOperator scanOperator = op.cast();
if (scanOperator.isTransformed()) {
return false;
}
MetadataTable metadataTable = (MetadataTable) scanOperator.getTable();
return metadataTable.getMetadataTableType() == MetadataTableType.PARTITIONS;
}

@Override
public List<OptExpression> transform(OptExpression input, OptimizerContext context) {
LogicalIcebergMetadataScanOperator operator = input.getOp().cast();
List<ColumnRefOperator> groupingKeys = new ArrayList<>();
Map<ColumnRefOperator, CallOperator> aggregations = new HashMap<>();
for (Map.Entry<ColumnRefOperator, Column> entries : operator.getColRefToColumnMetaMap().entrySet()) {
ColumnRefOperator columnRefOperator = entries.getKey();
Column column = entries.getValue();
CallOperator agg = null;
Function fun;
Type[] argTypes = new Type[] {column.getType()};
String columnName = columnRefOperator.getName();
switch (columnName) {
case "partition_value":
groupingKeys.add(columnRefOperator);
break;
case "spec_id":
fun = Expr.getBuiltinFunction("any_value", argTypes, Function.CompareMode.IS_IDENTICAL);
agg = new CallOperator("any_value", Type.INT, Lists.newArrayList(columnRefOperator), fun);
break;
case "record_count":
case "total_data_file_size_in_bytes":
case "position_delete_record_count":
case "equality_delete_record_count":
case "file_count":
case "position_delete_file_count":
case "equality_delete_file_count":
fun = Expr.getBuiltinFunction("sum", argTypes, Function.CompareMode.IS_IDENTICAL);
agg = new CallOperator("sum", Type.BIGINT, Lists.newArrayList(columnRefOperator), fun);
break;
case "last_updated_at":
fun = Expr.getBuiltinFunction("max", argTypes, Function.CompareMode.IS_IDENTICAL);
agg = new CallOperator("max", Type.DATETIME, Lists.newArrayList(columnRefOperator), fun);
break;
default:
throw new StarRocksConnectorException("Unknown column name %s when rewriting " +
"iceberg partitions table", columnName);
}

if (agg != null) {
aggregations.put(columnRefOperator, agg);
}
}

LogicalAggregationOperator agg = new LogicalAggregationOperator(AggType.GLOBAL, groupingKeys, aggregations);
LogicalIcebergMetadataScanOperator newScanOp = new LogicalIcebergMetadataScanOperator(
operator.getTable(),
operator.getColRefToColumnMetaMap(),
operator.getColumnMetaToColRefMap(),
operator.getLimit(),
operator.getPredicate());
newScanOp.setTransformed(true);
return Collections.singletonList(OptExpression.create(agg, OptExpression.create(newScanOp)));
}
}
5 changes: 3 additions & 2 deletions gensrc/thrift/Types.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,8 @@ enum TTableType {
ICEBERG_METADATA_LOG_ENTRIES_TABLE,
ICEBERG_SNAPSHOTS_TABLE,
ICEBERG_MANIFESTS_TABLE,
ICEBERG_FILES_TABLE
ICEBERG_FILES_TABLE,
ICEBERG_PARTITIONS_TABLE
}

enum TKeysType {
Expand Down Expand Up @@ -591,4 +592,4 @@ struct TSnapshotInfo {
enum TTxnType {
TXN_NORMAL = 0,
TXN_REPLICATION = 1
}
}
Loading

0 comments on commit 69829a4

Please sign in to comment.