Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AMORO-3424] Add the self-optimizing.partition-filter parameter #3426

Merged
merged 10 commits into from
Mar 12, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,11 @@ public static OptimizingConfig parseOptimizingConfig(Map<String, String> propert
properties,
TableProperties.SELF_OPTIMIZING_FULL_REWRITE_ALL_FILES,
TableProperties.SELF_OPTIMIZING_FULL_REWRITE_ALL_FILES_DEFAULT))
.setFilter(
CompatiblePropertyUtil.propertyAsString(
properties,
TableProperties.SELF_OPTIMIZING_FILTER,
TableProperties.SELF_OPTIMIZING_FILTER_DEFAULT))
.setBaseHashBucket(
CompatiblePropertyUtil.propertyAsInt(
properties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
import org.apache.amoro.table.TableProperties;
import org.apache.amoro.table.TableSnapshot;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.StructLike;
Expand Down Expand Up @@ -121,6 +122,83 @@ public void testFragmentFiles() {
assertInput(pendingInput, FileInfo.buildFileInfo(dataFiles));
}

@Test
public void testFragmentFilesWithPartitionFilterTimeStamp() {
getMixedTable()
.updateProperties()
.set(TableProperties.SELF_OPTIMIZING_FILTER, "op_time >= '2022-01-01T12:00:00'")
.commit();
testFragmentFilesWithPartitionFilterDo(true);

getMixedTable()
.updateProperties()
.set(TableProperties.SELF_OPTIMIZING_FILTER, "op_time > '2022-01-01T12:00:00'")
.commit();
testFragmentFilesWithPartitionFilterDo(false);
}

@Test
public void testFragmentFilesWithPartitionFilterInteger() {
getMixedTable()
.updateProperties()
.set(TableProperties.SELF_OPTIMIZING_FILTER, "id > 0")
.commit();
testFragmentFilesWithPartitionFilterDo(true);

getMixedTable()
.updateProperties()
.set(TableProperties.SELF_OPTIMIZING_FILTER, "id > 8")
.commit();
testFragmentFilesWithPartitionFilterDo(false);
}

@Test
public void testFragmentFilesWithPartitionFilterString() {
getMixedTable()
.updateProperties()
.set(TableProperties.SELF_OPTIMIZING_FILTER, "name > '0'")
.commit();
testFragmentFilesWithPartitionFilterDo(true);

getMixedTable()
.updateProperties()
.set(TableProperties.SELF_OPTIMIZING_FILTER, "name > '8'")
.commit();
testFragmentFilesWithPartitionFilterDo(false);
}

private void testFragmentFilesWithPartitionFilterDo(boolean isNecessary) {
closeFullOptimizingInterval();
updateBaseHashBucket(1);
List<DataFile> dataFiles = Lists.newArrayList();
List<Record> newRecords =
OptimizingTestHelpers.generateRecord(tableTestHelper(), 1, 4, "2022-01-01T12:00:00");
long transactionId = beginTransaction();
dataFiles.addAll(
OptimizingTestHelpers.appendBase(
getMixedTable(),
tableTestHelper().writeBaseStore(getMixedTable(), transactionId, newRecords, false)));

// add more files
newRecords =
OptimizingTestHelpers.generateRecord(tableTestHelper(), 5, 8, "2022-01-01T12:00:00");
transactionId = beginTransaction();
dataFiles.addAll(
OptimizingTestHelpers.appendBase(
getMixedTable(),
tableTestHelper().writeBaseStore(getMixedTable(), transactionId, newRecords, false)));

AbstractOptimizingEvaluator optimizingEvaluator = buildOptimizingEvaluator();
if (isNecessary) {
Assert.assertTrue(optimizingEvaluator.isNecessary());
AbstractOptimizingEvaluator.PendingInput pendingInput =
optimizingEvaluator.getOptimizingPendingInput();
assertInput(pendingInput, FileInfo.buildFileInfo(dataFiles));
} else {
Assert.assertFalse(optimizingEvaluator.isNecessary());
}
}

protected AbstractOptimizingEvaluator buildOptimizingEvaluator() {
TableSnapshot snapshot = IcebergTableUtil.getSnapshot(getMixedTable(), tableRuntime);
return IcebergTableUtil.createOptimizingEvaluator(tableRuntime, getMixedTable(), snapshot, 100);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ public class OptimizingConfig {
// self-optimizing.full.rewrite-all-files
private boolean fullRewriteAllFiles;

// self-optimizing.filter
private String filter;

// base.file-index.hash-bucket
private int baseHashBucket;

Expand Down Expand Up @@ -240,6 +243,15 @@ public OptimizingConfig setFullRewriteAllFiles(boolean fullRewriteAllFiles) {
return this;
}

public OptimizingConfig setFilter(String filter) {
this.filter = filter;
return this;
}

public String getFilter() {
return filter;
}

public int getBaseHashBucket() {
return baseHashBucket;
}
Expand Down Expand Up @@ -291,6 +303,7 @@ public boolean equals(Object o) {
&& Double.compare(that.majorDuplicateRatio, majorDuplicateRatio) == 0
&& fullTriggerInterval == that.fullTriggerInterval
&& fullRewriteAllFiles == that.fullRewriteAllFiles
&& Objects.equal(filter, that.filter)
&& baseHashBucket == that.baseHashBucket
&& baseRefreshInterval == that.baseRefreshInterval
&& hiveRefreshInterval == that.hiveRefreshInterval
Expand All @@ -317,6 +330,7 @@ public int hashCode() {
majorDuplicateRatio,
fullTriggerInterval,
fullRewriteAllFiles,
filter,
baseHashBucket,
baseRefreshInterval,
hiveRefreshInterval,
Expand All @@ -341,6 +355,7 @@ public String toString() {
.add("majorDuplicateRatio", majorDuplicateRatio)
.add("fullTriggerInterval", fullTriggerInterval)
.add("fullRewriteAllFiles", fullRewriteAllFiles)
.add("filter", filter)
.add("baseHashBucket", baseHashBucket)
.add("baseRefreshInterval", baseRefreshInterval)
.add("hiveRefreshInterval", hiveRefreshInterval)
Expand Down
5 changes: 5 additions & 0 deletions amoro-format-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@
<artifactId>paimon-bundle</artifactId>
</dependency>

<dependency>
<groupId>com.github.jsqlparser</groupId>
<artifactId>jsqlparser</artifactId>
</dependency>

<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@
import org.apache.amoro.table.KeyedTableSnapshot;
import org.apache.amoro.table.MixedTable;
import org.apache.amoro.table.TableSnapshot;
import org.apache.amoro.utils.ExpressionUtil;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PropertyUtil;
Expand Down Expand Up @@ -111,7 +111,8 @@ protected void initEvaluator() {
}

protected Expression getPartitionFilter() {
return Expressions.alwaysTrue();
return ExpressionUtil.convertSqlFilterToIcebergExpression(
config.getFilter(), mixedTable.schema().columns());
}

private void initPartitionPlans(TableFileScanHelper tableFileScanHelper) {
Expand Down Expand Up @@ -142,7 +143,7 @@ private void initPartitionPlans(TableFileScanHelper tableFileScanHelper) {
partitionPlanMap.entrySet().stream()
.filter(entry -> entry.getValue().isNecessary())
.limit(maxPendingPartitions)
.collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue())));
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
}

protected abstract PartitionEvaluator buildEvaluator(Pair<Integer, StructLike> partition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -110,6 +111,10 @@ public Map<String, Long> getToSequence() {

@Override
protected Expression getPartitionFilter() {
if (Expressions.alwaysTrue().equals(partitionFilter)
&& !Expressions.alwaysTrue().equals(super.getPartitionFilter())) {
return super.getPartitionFilter();
}
return partitionFilter;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ private TableProperties() {}
"self-optimizing.full.rewrite-all-files";
public static final boolean SELF_OPTIMIZING_FULL_REWRITE_ALL_FILES_DEFAULT = true;

public static final String SELF_OPTIMIZING_FILTER = "self-optimizing.filter";
public static final String SELF_OPTIMIZING_FILTER_DEFAULT = null;

public static final String SELF_OPTIMIZING_MIN_PLAN_INTERVAL =
"self-optimizing.min-plan-interval";
public static final long SELF_OPTIMIZING_MIN_PLAN_INTERVAL_DEFAULT = 60000;
Expand Down
Loading