Skip to content

Commit

Permalink
[Enhancement] Support querying Iceberg tables that have been REPLACE …
Browse files Browse the repository at this point in the history
…PARTITION FIELD (#37452)

Signed-off-by: LPL <lipenglin@apache.org>
  • Loading branch information
lipenglin authored Jan 3, 2024
1 parent 327e9d9 commit 582c371
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 4 deletions.
39 changes: 36 additions & 3 deletions fe/fe-core/src/main/java/com/starrocks/catalog/IcebergTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import com.starrocks.thrift.TTableType;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SortField;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -151,10 +152,9 @@ public List<Column> getPartitionColumns() {
if (partitionColumns == null) {
List<PartitionField> identityPartitionFields = this.getNativeTable().spec().fields().stream().
filter(partitionField -> partitionField.transform().isIdentity()).collect(Collectors.toList());
partitionColumns = identityPartitionFields.stream().map(partitionField -> getColumn(partitionField.name()))
.collect(Collectors.toList());
partitionColumns = identityPartitionFields.stream().map(partitionField -> getColumn(getPartitionSourceName(
this.getNativeTable().schema(), partitionField))).collect(Collectors.toList());
}

return partitionColumns;
}
public List<Column> getPartitionColumnsIncludeTransformed() {
Expand Down Expand Up @@ -210,6 +210,39 @@ public boolean isV2Format() {
return ((BaseTable) getNativeTable()).operations().current().formatVersion() > 1;
}

/**
* <p>
* In the Iceberg Partition Evolution scenario, 'org.apache.iceberg.PartitionField#name' only represents the
* name of a partition in the Iceberg table's Partition Spec. This name is used when trying to obtain the
* names of Partition Spec partitions. e.g.
* </p>
* <p>
* {
* "source-id": 4,
* "field-id": 1000,
* "name": "ts_day",
* "transform": "day"
* }
* </p>
* <p>
* column id is '4', column name is 'ts', but 'PartitionField#name' is 'ts_day', 'PartitionField#fieldId'
* is '1000', 'PartitionField#name' default is 'columnName_transformName', and we can customize this name.
* So even for an Identity Transform, this name doesn't necessarily have to match the schema column name,
* because we can customize this name. But in general, nobody customize an Identity Transform Partition name.
* </p>
* <p>
* To obtain the table columns for Iceberg tables, we use 'org.apache.iceberg.Schema#findColumnName'.
* </p>
*<br>
* refs:<br>
* - https://iceberg.apache.org/spec/#partition-evolution<br>
* - https://iceberg.apache.org/spec/#partition-specs<br>
* - https://iceberg.apache.org/spec/#partition-transforms
*/
public String getPartitionSourceName(Schema schema, PartitionField partition) {
return schema.findColumnName(partition.sourceId());
}

@Override
public boolean isUnPartitioned() {
return ((BaseTable) getNativeTable()).operations().current().spec().isUnpartitioned();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,8 @@ public List<PartitionKey> getPrunedPartitions(Table table, ScalarOperator predic
continue;
}

srTypes.add(icebergTable.getColumn(partitionField.name()).getType());
srTypes.add(icebergTable.getColumn(icebergTable.getPartitionSourceName(spec.schema(),
partitionField)).getType());
}

if (icebergTable.hasPartitionTransformedEvolution()) {
Expand Down

0 comments on commit 582c371

Please sign in to comment.