From 65906a8202dcc9cf9dafbfd7c95347cd1827f79b Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Tue, 4 Jul 2023 17:27:06 +0200 Subject: [PATCH] Arrow,Core,Spark: Remove functionality marked for removal in 1.4.0 --- .../parquet/VectorizedColumnIterator.java | 66 ----- .../parquet/VectorizedPageIterator.java | 115 -------- .../iceberg/MergingSnapshotProducer.java | 22 -- .../BaseDeleteOrphanFilesActionResult.java | 38 --- .../BaseDeleteReachableFilesActionResult.java | 92 ------ .../actions/BaseFileGroupRewriteResult.java | 67 ----- .../actions/BaseMigrateTableActionResult.java | 37 --- .../BaseRewriteDataFilesFileGroupInfo.java | 64 ----- .../actions/BaseRewriteDataFilesResult.java | 41 --- .../BaseRewriteManifestsActionResult.java | 53 ---- .../BaseSnapshotTableActionResult.java | 38 --- .../iceberg/actions/BinPackStrategy.java | 1 + .../RewritePositionDeleteStrategy.java | 74 ----- .../iceberg/actions/RewriteStrategy.java | 3 +- .../apache/iceberg/actions/SortStrategy.java | 1 + .../BaseDeleteOrphanFilesSparkAction.java | 4 +- .../BaseDeleteReachableFilesSparkAction.java | 12 +- .../actions/BaseMigrateTableSparkAction.java | 6 +- .../BaseRewriteDataFilesSparkAction.java | 19 +- .../BaseRewriteManifestsSparkAction.java | 11 +- .../actions/BaseSnapshotTableSparkAction.java | 6 +- .../actions/DeleteOrphanFilesSparkAction.java | 4 +- .../DeleteReachableFilesSparkAction.java | 17 +- .../actions/MigrateTableSparkAction.java | 6 +- .../actions/RewriteDataFilesSparkAction.java | 19 +- .../actions/RewriteManifestsSparkAction.java | 11 +- .../actions/SnapshotTableSparkAction.java | 6 +- .../spark/actions/SparkBinPackStrategy.java | 103 ------- .../spark/actions/SparkSortStrategy.java | 187 ------------ .../spark/actions/SparkZOrderStrategy.java | 268 ------------------ .../IcebergSortCompactionBenchmark.java | 30 +- .../actions/TestRewriteDataFilesAction.java | 59 ++-- 32 files changed, 121 insertions(+), 1359 deletions(-) delete mode 100644 core/src/main/java/org/apache/iceberg/actions/BaseDeleteOrphanFilesActionResult.java delete mode 100644 core/src/main/java/org/apache/iceberg/actions/BaseDeleteReachableFilesActionResult.java delete mode 100644 core/src/main/java/org/apache/iceberg/actions/BaseFileGroupRewriteResult.java delete mode 100644 core/src/main/java/org/apache/iceberg/actions/BaseMigrateTableActionResult.java delete mode 100644 core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesFileGroupInfo.java delete mode 100644 core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesResult.java delete mode 100644 core/src/main/java/org/apache/iceberg/actions/BaseRewriteManifestsActionResult.java delete mode 100644 core/src/main/java/org/apache/iceberg/actions/BaseSnapshotTableActionResult.java delete mode 100644 core/src/main/java/org/apache/iceberg/actions/RewritePositionDeleteStrategy.java delete mode 100644 spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java delete mode 100644 spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortStrategy.java delete mode 100644 spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java index 73e332058e87..822ca8973f54 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java @@ -186,54 +186,6 @@ protected int nextBatchOf( } } - /** @deprecated will be removed in 1.4.0 */ - @Deprecated - public class IntBackedDecimalBatchReader extends BatchReader { - @Override - protected int nextBatchOf( - final FieldVector vector, - final int expectedBatchSize, - final int numValsInVector, - final int typeWidth, - NullabilityHolder holder) { - return vectorizedPageIterator - .intBackedDecimalPageReader() - .nextBatch(vector, expectedBatchSize, numValsInVector, typeWidth, holder); - } - } - - /** @deprecated will be removed in 1.4.0 */ - @Deprecated - public class LongBackedDecimalBatchReader extends BatchReader { - @Override - protected int nextBatchOf( - final FieldVector vector, - final int expectedBatchSize, - final int numValsInVector, - final int typeWidth, - NullabilityHolder holder) { - return vectorizedPageIterator - .longBackedDecimalPageReader() - .nextBatch(vector, expectedBatchSize, numValsInVector, typeWidth, holder); - } - } - - /** @deprecated will be removed in 1.4.0 */ - @Deprecated - public class FixedLengthDecimalBatchReader extends BatchReader { - @Override - protected int nextBatchOf( - final FieldVector vector, - final int expectedBatchSize, - final int numValsInVector, - final int typeWidth, - NullabilityHolder holder) { - return vectorizedPageIterator - .fixedLengthDecimalPageReader() - .nextBatch(vector, expectedBatchSize, numValsInVector, typeWidth, holder); - } - } - public class FixedSizeBinaryBatchReader extends BatchReader { @Override protected int nextBatchOf( @@ -318,24 +270,6 @@ public DoubleBatchReader doubleBatchReader() { return new DoubleBatchReader(); } - /** @deprecated will be removed in 1.4.0 */ - @Deprecated - public IntBackedDecimalBatchReader intBackedDecimalBatchReader() { - return new IntBackedDecimalBatchReader(); - } - - /** @deprecated will be removed in 1.4.0 */ - @Deprecated - public LongBackedDecimalBatchReader longBackedDecimalBatchReader() { - return new LongBackedDecimalBatchReader(); - } - - /** @deprecated will be removed in 1.4.0 */ - @Deprecated - public FixedLengthDecimalBatchReader fixedLengthDecimalBatchReader() { - return new FixedLengthDecimalBatchReader(); - } - public FixedSizeBinaryBatchReader fixedSizeBinaryBatchReader() { return new FixedSizeBinaryBatchReader(); } diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java index 0f5b297711e0..312cf03b36f8 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java @@ -19,7 +19,6 @@ package org.apache.iceberg.arrow.vectorized.parquet; import java.io.IOException; -import org.apache.arrow.vector.DecimalVector; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.IntVector; import org.apache.arrow.vector.VarBinaryVector; @@ -367,102 +366,6 @@ private int getActualBatchSize(int expectedBatchSize) { return Math.min(expectedBatchSize, triplesCount - triplesRead); } - /** - * Method for reading a batch of decimals backed by INT32 and INT64 parquet data types. Since - * Arrow stores all decimals in 16 bytes, byte arrays are appropriately padded before being - * written to Arrow data buffers. - * - * @deprecated will be removed in 1.4.0 - */ - @Deprecated - class IntBackedDecimalPageReader extends BagePageReader { - @Override - protected void nextVal( - FieldVector vector, int batchSize, int numVals, int typeWidth, NullabilityHolder holder) { - vectorizedDefinitionLevelReader - .intBackedDecimalReader() - .nextBatch(vector, numVals, typeWidth, batchSize, holder, plainValuesReader); - } - - @Override - protected void nextDictEncodedVal( - FieldVector vector, int batchSize, int numVals, int typeWidth, NullabilityHolder holder) { - vectorizedDefinitionLevelReader - .intBackedDecimalReader() - .nextDictEncodedBatch( - vector, - numVals, - typeWidth, - batchSize, - holder, - dictionaryEncodedValuesReader, - dictionary); - } - } - - /** @deprecated will be removed in 1.4.0 */ - @Deprecated - class LongBackedDecimalPageReader extends BagePageReader { - @Override - protected void nextVal( - FieldVector vector, int batchSize, int numVals, int typeWidth, NullabilityHolder holder) { - vectorizedDefinitionLevelReader - .longBackedDecimalReader() - .nextBatch(vector, numVals, typeWidth, batchSize, holder, plainValuesReader); - } - - @Override - protected void nextDictEncodedVal( - FieldVector vector, int batchSize, int numVals, int typeWidth, NullabilityHolder holder) { - vectorizedDefinitionLevelReader - .longBackedDecimalReader() - .nextDictEncodedBatch( - vector, - numVals, - typeWidth, - batchSize, - holder, - dictionaryEncodedValuesReader, - dictionary); - } - } - - /** - * Method for reading a batch of decimals backed by fixed length byte array parquet data type. - * Arrow stores all decimals in 16 bytes. This method provides the necessary padding to the - * decimals read. Moreover, Arrow interprets the decimals in Arrow buffer as little endian. - * Parquet stores fixed length decimals as big endian. So, this method uses {@link - * DecimalVector#setBigEndian(int, byte[])} method so that the data in Arrow vector is indeed - * little endian. - * - * @deprecated will be removed in 1.4.0 - */ - @Deprecated - class FixedLengthDecimalPageReader extends BagePageReader { - @Override - protected void nextVal( - FieldVector vector, int batchSize, int numVals, int typeWidth, NullabilityHolder holder) { - vectorizedDefinitionLevelReader - .fixedLengthDecimalReader() - .nextBatch(vector, numVals, typeWidth, batchSize, holder, plainValuesReader); - } - - @Override - protected void nextDictEncodedVal( - FieldVector vector, int batchSize, int numVals, int typeWidth, NullabilityHolder holder) { - vectorizedDefinitionLevelReader - .fixedLengthDecimalReader() - .nextDictEncodedBatch( - vector, - numVals, - typeWidth, - batchSize, - holder, - dictionaryEncodedValuesReader, - dictionary); - } - } - class FixedSizeBinaryPageReader extends BagePageReader { @Override protected void nextVal( @@ -585,24 +488,6 @@ DoublePageReader doublePageReader() { return new DoublePageReader(); } - /** @deprecated will be removed in 1.4.0 */ - @Deprecated - IntBackedDecimalPageReader intBackedDecimalPageReader() { - return new IntBackedDecimalPageReader(); - } - - /** @deprecated will be removed in 1.4.0 */ - @Deprecated - LongBackedDecimalPageReader longBackedDecimalPageReader() { - return new LongBackedDecimalPageReader(); - } - - /** @deprecated will be removed in 1.4.0 */ - @Deprecated - FixedLengthDecimalPageReader fixedLengthDecimalPageReader() { - return new FixedLengthDecimalPageReader(); - } - FixedSizeBinaryPageReader fixedSizeBinaryPageReader() { return new FixedSizeBinaryPageReader(); } diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index 50270a05f3dd..eff4bb853f59 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -157,16 +157,6 @@ protected Expression rowFilter() { return deleteExpression; } - /** - * Returns added data files. - * - * @deprecated since 1.3.0, will be removed in 1.4.0; use {@link #addedDataFiles()}. - */ - @Deprecated - protected List addedFiles() { - return addedDataFiles(); - } - protected List addedDataFiles() { return ImmutableList.copyOf(newDataFiles); } @@ -688,18 +678,6 @@ private CloseableIterable> deletedDataFiles( return manifestGroup.entries(); } - /** - * Sets a data sequence number for new data files. - * - * @param sequenceNumber a data sequence number - * @deprecated since 1.3.0, will be removed in 1.4.0; use {@link - * #setNewDataFilesDataSequenceNumber(long)}; - */ - @Deprecated - protected void setNewFilesSequenceNumber(long sequenceNumber) { - setNewDataFilesDataSequenceNumber(sequenceNumber); - } - protected void setNewDataFilesDataSequenceNumber(long sequenceNumber) { this.newDataFilesDataSequenceNumber = sequenceNumber; } diff --git a/core/src/main/java/org/apache/iceberg/actions/BaseDeleteOrphanFilesActionResult.java b/core/src/main/java/org/apache/iceberg/actions/BaseDeleteOrphanFilesActionResult.java deleted file mode 100644 index 550e8456345c..000000000000 --- a/core/src/main/java/org/apache/iceberg/actions/BaseDeleteOrphanFilesActionResult.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.actions; - -/** - * @deprecated will be removed in 1.4.0; use {@link ImmutableDeleteOrphanFiles.Result#builder()} - * instead. - */ -@Deprecated -public class BaseDeleteOrphanFilesActionResult implements DeleteOrphanFiles.Result { - - private final Iterable orphanFileLocations; - - public BaseDeleteOrphanFilesActionResult(Iterable orphanFileLocations) { - this.orphanFileLocations = orphanFileLocations; - } - - @Override - public Iterable orphanFileLocations() { - return orphanFileLocations; - } -} diff --git a/core/src/main/java/org/apache/iceberg/actions/BaseDeleteReachableFilesActionResult.java b/core/src/main/java/org/apache/iceberg/actions/BaseDeleteReachableFilesActionResult.java deleted file mode 100644 index 74ec1b63a06b..000000000000 --- a/core/src/main/java/org/apache/iceberg/actions/BaseDeleteReachableFilesActionResult.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.actions; - -/** - * @deprecated will be removed in 1.4.0; use {@link ImmutableDeleteReachableFiles.Result#builder()} - * instead. - */ -@Deprecated -public class BaseDeleteReachableFilesActionResult implements DeleteReachableFiles.Result { - - private final long deletedDataFilesCount; - private final long deletedPosDeleteFilesCount; - private final long deletedEqDeleteFilesCount; - private final long deletedManifestsCount; - private final long deletedManifestListsCount; - private final long deletedOtherFilesCount; - - public BaseDeleteReachableFilesActionResult( - long deletedDataFilesCount, - long deletedManifestsCount, - long deletedManifestListsCount, - long otherDeletedFilesCount) { - this.deletedDataFilesCount = deletedDataFilesCount; - this.deletedPosDeleteFilesCount = 0; - this.deletedEqDeleteFilesCount = 0; - this.deletedManifestsCount = deletedManifestsCount; - this.deletedManifestListsCount = deletedManifestListsCount; - this.deletedOtherFilesCount = otherDeletedFilesCount; - } - - public BaseDeleteReachableFilesActionResult( - long deletedDataFilesCount, - long deletedPosDeleteFilesCount, - long deletedEqDeleteFilesCount, - long deletedManifestsCount, - long deletedManifestListsCount, - long otherDeletedFilesCount) { - this.deletedDataFilesCount = deletedDataFilesCount; - this.deletedPosDeleteFilesCount = deletedPosDeleteFilesCount; - this.deletedEqDeleteFilesCount = deletedEqDeleteFilesCount; - this.deletedManifestsCount = deletedManifestsCount; - this.deletedManifestListsCount = deletedManifestListsCount; - this.deletedOtherFilesCount = otherDeletedFilesCount; - } - - @Override - public long deletedDataFilesCount() { - return deletedDataFilesCount; - } - - @Override - public long deletedPositionDeleteFilesCount() { - return deletedPosDeleteFilesCount; - } - - @Override - public long deletedEqualityDeleteFilesCount() { - return deletedEqDeleteFilesCount; - } - - @Override - public long deletedManifestsCount() { - return deletedManifestsCount; - } - - @Override - public long deletedManifestListsCount() { - return deletedManifestListsCount; - } - - @Override - public long deletedOtherFilesCount() { - return deletedOtherFilesCount; - } -} diff --git a/core/src/main/java/org/apache/iceberg/actions/BaseFileGroupRewriteResult.java b/core/src/main/java/org/apache/iceberg/actions/BaseFileGroupRewriteResult.java deleted file mode 100644 index d238b7fbdffc..000000000000 --- a/core/src/main/java/org/apache/iceberg/actions/BaseFileGroupRewriteResult.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.actions; - -import org.apache.iceberg.actions.RewriteDataFiles.FileGroupInfo; -import org.apache.iceberg.actions.RewriteDataFiles.FileGroupRewriteResult; - -/** - * @deprecated will be removed in 1.4.0; use {@link - * ImmutableRewriteDataFiles.FileGroupRewriteResult#builder()} instead. - */ -@Deprecated -public class BaseFileGroupRewriteResult implements FileGroupRewriteResult { - private final int addedDataFilesCount; - private final int rewrittenDataFilesCount; - private final long rewrittenBytesCount; - private final FileGroupInfo info; - - public BaseFileGroupRewriteResult( - FileGroupInfo info, int addedFilesCount, int rewrittenFilesCount) { - this(info, addedFilesCount, rewrittenFilesCount, 0L); - } - - public BaseFileGroupRewriteResult( - FileGroupInfo info, int addedFilesCount, int rewrittenFilesCount, long rewrittenBytesCount) { - this.info = info; - this.addedDataFilesCount = addedFilesCount; - this.rewrittenDataFilesCount = rewrittenFilesCount; - this.rewrittenBytesCount = rewrittenBytesCount; - } - - @Override - public FileGroupInfo info() { - return info; - } - - @Override - public int addedDataFilesCount() { - return addedDataFilesCount; - } - - @Override - public int rewrittenDataFilesCount() { - return rewrittenDataFilesCount; - } - - @Override - public long rewrittenBytesCount() { - return rewrittenBytesCount; - } -} diff --git a/core/src/main/java/org/apache/iceberg/actions/BaseMigrateTableActionResult.java b/core/src/main/java/org/apache/iceberg/actions/BaseMigrateTableActionResult.java deleted file mode 100644 index 834498d3a13e..000000000000 --- a/core/src/main/java/org/apache/iceberg/actions/BaseMigrateTableActionResult.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.actions; - -/** - * @deprecated will be removed in 1.4.0; use {@link ImmutableMigrateTable.Result#builder()} instead. - */ -@Deprecated -public class BaseMigrateTableActionResult implements MigrateTable.Result { - - private final long migratedDataFilesCount; - - public BaseMigrateTableActionResult(long migratedDataFilesCount) { - this.migratedDataFilesCount = migratedDataFilesCount; - } - - @Override - public long migratedDataFilesCount() { - return migratedDataFilesCount; - } -} diff --git a/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesFileGroupInfo.java b/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesFileGroupInfo.java deleted file mode 100644 index fad8b4b5b8b5..000000000000 --- a/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesFileGroupInfo.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.actions; - -import org.apache.iceberg.StructLike; -import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; - -/** - * @deprecated will be removed in 1.4.0; use {@link - * ImmutableRewriteDataFiles.FileGroupInfo#builder()} instead. - */ -@Deprecated -public class BaseRewriteDataFilesFileGroupInfo implements RewriteDataFiles.FileGroupInfo { - private final int globalIndex; - private final int partitionIndex; - private final StructLike partition; - - public BaseRewriteDataFilesFileGroupInfo( - int globalIndex, int partitionIndex, StructLike partition) { - this.globalIndex = globalIndex; - this.partitionIndex = partitionIndex; - this.partition = partition; - } - - @Override - public int globalIndex() { - return globalIndex; - } - - @Override - public int partitionIndex() { - return partitionIndex; - } - - @Override - public StructLike partition() { - return partition; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("globalIndex", globalIndex) - .add("partitionIndex", partitionIndex) - .add("partition", partition) - .toString(); - } -} diff --git a/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesResult.java b/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesResult.java deleted file mode 100644 index a99f40592808..000000000000 --- a/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesResult.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.actions; - -import java.util.List; -import org.apache.iceberg.actions.RewriteDataFiles.FileGroupRewriteResult; -import org.apache.iceberg.actions.RewriteDataFiles.Result; - -/** - * @deprecated will be removed in 1.4.0; use {@link ImmutableRewriteDataFiles.Result#builder()} - * instead. - */ -@Deprecated -public class BaseRewriteDataFilesResult implements Result { - private final List rewriteResults; - - public BaseRewriteDataFilesResult(List rewriteResults) { - this.rewriteResults = rewriteResults; - } - - @Override - public List rewriteResults() { - return rewriteResults; - } -} diff --git a/core/src/main/java/org/apache/iceberg/actions/BaseRewriteManifestsActionResult.java b/core/src/main/java/org/apache/iceberg/actions/BaseRewriteManifestsActionResult.java deleted file mode 100644 index e9fc067bc00c..000000000000 --- a/core/src/main/java/org/apache/iceberg/actions/BaseRewriteManifestsActionResult.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.actions; - -import org.apache.iceberg.ManifestFile; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; - -/** - * @deprecated will be removed in 1.4.0; use {@link ImmutableRewriteManifests.Result#builder()} - * instead. - */ -@Deprecated -public class BaseRewriteManifestsActionResult implements RewriteManifests.Result { - - private final Iterable rewrittenManifests; - private final Iterable addedManifests; - - public BaseRewriteManifestsActionResult( - Iterable rewrittenManifests, Iterable addedManifests) { - this.rewrittenManifests = rewrittenManifests; - this.addedManifests = addedManifests; - } - - public static RewriteManifests.Result empty() { - return new BaseRewriteManifestsActionResult(ImmutableList.of(), ImmutableList.of()); - } - - @Override - public Iterable rewrittenManifests() { - return rewrittenManifests; - } - - @Override - public Iterable addedManifests() { - return addedManifests; - } -} diff --git a/core/src/main/java/org/apache/iceberg/actions/BaseSnapshotTableActionResult.java b/core/src/main/java/org/apache/iceberg/actions/BaseSnapshotTableActionResult.java deleted file mode 100644 index 701fe34000a4..000000000000 --- a/core/src/main/java/org/apache/iceberg/actions/BaseSnapshotTableActionResult.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.actions; - -/** - * @deprecated will be removed in 1.4.0; use {@link ImmutableSnapshotTable.Result#builder()} - * instead. - */ -@Deprecated -public class BaseSnapshotTableActionResult implements SnapshotTable.Result { - - private final long importedDataFilesCount; - - public BaseSnapshotTableActionResult(long importedDataFilesCount) { - this.importedDataFilesCount = importedDataFilesCount; - } - - @Override - public long importedDataFilesCount() { - return importedDataFilesCount; - } -} diff --git a/core/src/main/java/org/apache/iceberg/actions/BinPackStrategy.java b/core/src/main/java/org/apache/iceberg/actions/BinPackStrategy.java index 8e0c0b01dd90..ecca60cef2fb 100644 --- a/core/src/main/java/org/apache/iceberg/actions/BinPackStrategy.java +++ b/core/src/main/java/org/apache/iceberg/actions/BinPackStrategy.java @@ -46,6 +46,7 @@ * RewriteDataFiles#TARGET_FILE_SIZE_BYTES}. * * @deprecated since 1.3.0, will be removed in 1.4.0; use {@link SizeBasedFileRewriter} instead. + * Note: This can only be removed once Spark 3.1 + 3.2 isn't using this API anymore. */ @Deprecated public abstract class BinPackStrategy implements RewriteStrategy { diff --git a/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeleteStrategy.java b/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeleteStrategy.java deleted file mode 100644 index 4859883eb047..000000000000 --- a/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeleteStrategy.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.actions; - -import java.util.Map; -import java.util.Set; -import org.apache.iceberg.DeleteFile; -import org.apache.iceberg.Table; - -/** - * A strategy for an action to rewrite position delete files. - * - * @deprecated since 1.3.0, will be removed in 1.4.0; Use {@link SizeBasedFileRewriter} instead - */ -@Deprecated -public interface RewritePositionDeleteStrategy { - - /** Returns the name of this rewrite deletes strategy */ - String name(); - - /** Returns the table being modified by this rewrite strategy */ - Table table(); - - /** - * Returns a set of options which this rewrite strategy can use. This is an allowed-list and any - * options not specified here will be rejected at runtime. - */ - Set validOptions(); - - /** Sets options to be used with this strategy */ - RewritePositionDeleteStrategy options(Map options); - - /** - * Select the delete files to rewrite. - * - * @param deleteFiles iterable of delete files in a group. - * @return iterable of original delete file to be replaced. - */ - Iterable selectDeleteFiles(Iterable deleteFiles); - - /** - * Groups into lists which will be processed in a single executable unit. Each group will end up - * being committed as an independent set of changes. This creates the jobs which will eventually - * be run as by the underlying Action. - * - * @param deleteFiles iterable of DeleteFile to be rewritten - * @return iterable of lists of FileScanTasks which will be processed together - */ - Iterable> planDeleteFileGroups(Iterable deleteFiles); - - /** - * Define how to rewrite the deletes. - * - * @param deleteFilesToRewrite a group of files to be rewritten together - * @return iterable of delete files used to replace the original delete files. - */ - Iterable rewriteDeleteFiles(Iterable deleteFilesToRewrite); -} diff --git a/core/src/main/java/org/apache/iceberg/actions/RewriteStrategy.java b/core/src/main/java/org/apache/iceberg/actions/RewriteStrategy.java index d3a450ddfb93..a09282f35ce2 100644 --- a/core/src/main/java/org/apache/iceberg/actions/RewriteStrategy.java +++ b/core/src/main/java/org/apache/iceberg/actions/RewriteStrategy.java @@ -29,7 +29,8 @@ /** * A strategy for rewriting files. * - * @deprecated since 1.3.0, will be removed in 1.4.0; use {@link FileRewriter} instead. + * @deprecated since 1.3.0, will be removed in 1.4.0; use {@link FileRewriter} instead. Note: This + * can only be removed once Spark 3.1 + 3.2 isn't using this API anymore. */ @Deprecated public interface RewriteStrategy extends Serializable { diff --git a/core/src/main/java/org/apache/iceberg/actions/SortStrategy.java b/core/src/main/java/org/apache/iceberg/actions/SortStrategy.java index 59decb802066..733e29cd56b7 100644 --- a/core/src/main/java/org/apache/iceberg/actions/SortStrategy.java +++ b/core/src/main/java/org/apache/iceberg/actions/SortStrategy.java @@ -39,6 +39,7 @@ *

In the future other algorithms for determining files to rewrite will be provided. * * @deprecated since 1.3.0, will be removed in 1.4.0; use {@link SizeBasedFileRewriter} instead. + * Note: This can only be removed once Spark 3.1 + 3.2 isn't using this API anymore. */ @Deprecated public abstract class SortStrategy extends BinPackStrategy { diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java index 72b6268026ab..d970ad7e19b0 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java @@ -34,8 +34,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.iceberg.Table; -import org.apache.iceberg.actions.BaseDeleteOrphanFilesActionResult; import org.apache.iceberg.actions.DeleteOrphanFiles; +import org.apache.iceberg.actions.ImmutableDeleteOrphanFiles; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.hadoop.HiddenPathFilter; @@ -189,7 +189,7 @@ private DeleteOrphanFiles.Result doExecute() { .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc)) .run(deleteFunc::accept); - return new BaseDeleteOrphanFilesActionResult(orphanFiles); + return ImmutableDeleteOrphanFiles.Result.builder().orphanFileLocations(orphanFiles).build(); } private Dataset buildActualFileDF() { diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteReachableFilesSparkAction.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteReachableFilesSparkAction.java index bfce42bb2580..fc63e88ec7a6 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteReachableFilesSparkAction.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteReachableFilesSparkAction.java @@ -30,8 +30,8 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableMetadataParser; -import org.apache.iceberg.actions.BaseDeleteReachableFilesActionResult; import org.apache.iceberg.actions.DeleteReachableFiles; +import org.apache.iceberg.actions.ImmutableDeleteReachableFiles; import org.apache.iceberg.exceptions.NotFoundException; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.hadoop.HadoopFileIO; @@ -161,7 +161,7 @@ protected Dataset buildOtherMetadataFileDF(Table table) { * @param deleted an Iterator of Spark Rows of the structure (path: String, type: String) * @return Statistics on which files were deleted */ - private BaseDeleteReachableFilesActionResult deleteFiles(Iterator deleted) { + private DeleteReachableFiles.Result deleteFiles(Iterator deleted) { AtomicLong dataFileCount = new AtomicLong(0L); AtomicLong manifestCount = new AtomicLong(0L); AtomicLong manifestListCount = new AtomicLong(0L); @@ -206,7 +206,11 @@ private BaseDeleteReachableFilesActionResult deleteFiles(Iterator deleted) long filesCount = dataFileCount.get() + manifestCount.get() + manifestListCount.get() + otherFilesCount.get(); LOG.info("Total files removed: {}", filesCount); - return new BaseDeleteReachableFilesActionResult( - dataFileCount.get(), manifestCount.get(), manifestListCount.get(), otherFilesCount.get()); + return ImmutableDeleteReachableFiles.Result.builder() + .deletedDataFilesCount(dataFileCount.get()) + .deletedManifestsCount(manifestCount.get()) + .deletedManifestListsCount(manifestListCount.get()) + .deletedOtherFilesCount(otherFilesCount.get()) + .build(); } } diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseMigrateTableSparkAction.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseMigrateTableSparkAction.java index 67297da6c27b..6be7d24f8bad 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseMigrateTableSparkAction.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseMigrateTableSparkAction.java @@ -22,7 +22,7 @@ import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.Table; -import org.apache.iceberg.actions.BaseMigrateTableActionResult; +import org.apache.iceberg.actions.ImmutableMigrateTable; import org.apache.iceberg.actions.MigrateTable; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.NoSuchTableException; @@ -163,7 +163,9 @@ private MigrateTable.Result doExecute() { "Successfully loaded Iceberg metadata for {} files to {}", migratedDataFilesCount, destTableIdent()); - return new BaseMigrateTableActionResult(migratedDataFilesCount); + return ImmutableMigrateTable.Result.builder() + .migratedDataFilesCount(migratedDataFilesCount) + .build(); } @Override diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java index 3d02d9d87f3b..58f269a35a0c 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.math.RoundingMode; -import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Map; @@ -38,9 +37,8 @@ import org.apache.iceberg.SortOrder; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; -import org.apache.iceberg.actions.BaseRewriteDataFilesFileGroupInfo; -import org.apache.iceberg.actions.BaseRewriteDataFilesResult; import org.apache.iceberg.actions.BinPackStrategy; +import org.apache.iceberg.actions.ImmutableRewriteDataFiles; import org.apache.iceberg.actions.RewriteDataFiles; import org.apache.iceberg.actions.RewriteDataFilesCommitManager; import org.apache.iceberg.actions.RewriteFileGroup; @@ -153,7 +151,7 @@ public RewriteDataFiles filter(Expression expression) { @Override public RewriteDataFiles.Result execute() { if (table.currentSnapshot() == null) { - return new BaseRewriteDataFilesResult(ImmutableList.of()); + return ImmutableRewriteDataFiles.Result.builder().build(); } long startingSnapshotId = table.currentSnapshot().snapshotId(); @@ -171,7 +169,7 @@ public RewriteDataFiles.Result execute() { if (ctx.totalGroupCount() == 0) { LOG.info("Nothing found to rewrite in {}", table.name()); - return new BaseRewriteDataFilesResult(Collections.emptyList()); + return ImmutableRewriteDataFiles.Result.builder().build(); } Stream groupStream = toGroupStream(ctx, fileGroupsByPartition); @@ -330,7 +328,7 @@ private Result doExecute( List rewriteResults = rewrittenGroups.stream().map(RewriteFileGroup::asResult).collect(Collectors.toList()); - return new BaseRewriteDataFilesResult(rewriteResults); + return ImmutableRewriteDataFiles.Result.builder().rewriteResults(rewriteResults).build(); } private Result doExecuteWithPartialProgress( @@ -370,7 +368,7 @@ private Result doExecuteWithPartialProgress( List rewriteResults = commitResults.stream().map(RewriteFileGroup::asResult).collect(Collectors.toList()); - return new BaseRewriteDataFilesResult(rewriteResults); + return ImmutableRewriteDataFiles.Result.builder().rewriteResults(rewriteResults).build(); } Stream toGroupStream( @@ -389,8 +387,11 @@ Stream toGroupStream( int globalIndex = ctx.currentGlobalIndex(); int partitionIndex = ctx.currentPartitionIndex(partition); FileGroupInfo info = - new BaseRewriteDataFilesFileGroupInfo( - globalIndex, partitionIndex, partition); + ImmutableRewriteDataFiles.FileGroupInfo.builder() + .globalIndex(globalIndex) + .partitionIndex(partitionIndex) + .partition(partition) + .build(); return new RewriteFileGroup(info, tasks); }); }); diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java index 078b2e00bca9..eeb56b45ae84 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java @@ -40,7 +40,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableOperations; import org.apache.iceberg.TableProperties; -import org.apache.iceberg.actions.BaseRewriteManifestsActionResult; +import org.apache.iceberg.actions.ImmutableRewriteManifests; import org.apache.iceberg.actions.RewriteManifests; import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.exceptions.ValidationException; @@ -150,7 +150,7 @@ public RewriteManifests.Result execute() { private RewriteManifests.Result doExecute() { List matchingManifests = findMatchingManifests(); if (matchingManifests.isEmpty()) { - return BaseRewriteManifestsActionResult.empty(); + return ImmutableRewriteManifests.Result.builder().build(); } long totalSizeBytes = 0L; @@ -169,7 +169,7 @@ private RewriteManifests.Result doExecute() { int targetNumManifestEntries = targetNumManifestEntries(numEntries, targetNumManifests); if (targetNumManifests == 1 && matchingManifests.size() == 1) { - return BaseRewriteManifestsActionResult.empty(); + return ImmutableRewriteManifests.Result.builder().build(); } Dataset manifestEntryDF = buildManifestEntryDF(matchingManifests); @@ -185,7 +185,10 @@ private RewriteManifests.Result doExecute() { replaceManifests(matchingManifests, newManifests); - return new BaseRewriteManifestsActionResult(matchingManifests, newManifests); + return ImmutableRewriteManifests.Result.builder() + .rewrittenManifests(matchingManifests) + .addedManifests(newManifests) + .build(); } private Dataset buildManifestEntryDF(List manifests) { diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotTableSparkAction.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotTableSparkAction.java index a170ca23a5ab..51c4d26368d6 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotTableSparkAction.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotTableSparkAction.java @@ -23,7 +23,7 @@ import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; -import org.apache.iceberg.actions.BaseSnapshotTableActionResult; +import org.apache.iceberg.actions.ImmutableSnapshotTable; import org.apache.iceberg.actions.SnapshotTable; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -163,7 +163,9 @@ private SnapshotTable.Result doExecute() { "Successfully loaded Iceberg metadata for {} files to {}", importedDataFilesCount, destTableIdent()); - return new BaseSnapshotTableActionResult(importedDataFilesCount); + return ImmutableSnapshotTable.Result.builder() + .importedDataFilesCount(importedDataFilesCount) + .build(); } @Override diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java index ea73403c2e60..b00ed42008f1 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java @@ -44,8 +44,8 @@ import org.apache.hadoop.fs.PathFilter; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; -import org.apache.iceberg.actions.BaseDeleteOrphanFilesActionResult; import org.apache.iceberg.actions.DeleteOrphanFiles; +import org.apache.iceberg.actions.ImmutableDeleteOrphanFiles; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.hadoop.HiddenPathFilter; import org.apache.iceberg.io.BulkDeletionFailureException; @@ -272,7 +272,7 @@ private DeleteOrphanFiles.Result doExecute() { } } - return new BaseDeleteOrphanFilesActionResult(orphanFiles); + return ImmutableDeleteOrphanFiles.Result.builder().orphanFileLocations(orphanFiles).build(); } private Dataset validFileIdentDS() { diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java index cdc60a659d7c..ea6ac9f3dbf5 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java @@ -27,8 +27,8 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableMetadataParser; -import org.apache.iceberg.actions.BaseDeleteReachableFilesActionResult; import org.apache.iceberg.actions.DeleteReachableFiles; +import org.apache.iceberg.actions.ImmutableDeleteReachableFiles; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.hadoop.HadoopFileIO; import org.apache.iceberg.io.FileIO; @@ -144,12 +144,13 @@ private DeleteReachableFiles.Result deleteFiles(Iterator files) { LOG.info("Deleted {} total files", summary.totalFilesCount()); - return new BaseDeleteReachableFilesActionResult( - summary.dataFilesCount(), - summary.positionDeleteFilesCount(), - summary.equalityDeleteFilesCount(), - summary.manifestsCount(), - summary.manifestListsCount(), - summary.otherFilesCount()); + return ImmutableDeleteReachableFiles.Result.builder() + .deletedDataFilesCount(summary.dataFilesCount()) + .deletedPositionDeleteFilesCount(summary.positionDeleteFilesCount()) + .deletedEqualityDeleteFilesCount(summary.equalityDeleteFilesCount()) + .deletedManifestsCount(summary.manifestsCount()) + .deletedManifestListsCount(summary.manifestListsCount()) + .deletedOtherFilesCount(summary.otherFilesCount()) + .build(); } } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java index 9106f97e47c8..fe8acf0157d3 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java @@ -22,7 +22,7 @@ import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.Table; -import org.apache.iceberg.actions.BaseMigrateTableActionResult; +import org.apache.iceberg.actions.ImmutableMigrateTable; import org.apache.iceberg.actions.MigrateTable; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.NoSuchTableException; @@ -162,7 +162,9 @@ private MigrateTable.Result doExecute() { "Successfully loaded Iceberg metadata for {} files to {}", migratedDataFilesCount, destTableIdent()); - return new BaseMigrateTableActionResult(migratedDataFilesCount); + return ImmutableMigrateTable.Result.builder() + .migratedDataFilesCount(migratedDataFilesCount) + .build(); } @Override diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java index eeb4e49e30e8..9ceb1766e950 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.math.RoundingMode; -import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Map; @@ -38,9 +37,8 @@ import org.apache.iceberg.SortOrder; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; -import org.apache.iceberg.actions.BaseRewriteDataFilesFileGroupInfo; -import org.apache.iceberg.actions.BaseRewriteDataFilesResult; import org.apache.iceberg.actions.BinPackStrategy; +import org.apache.iceberg.actions.ImmutableRewriteDataFiles; import org.apache.iceberg.actions.RewriteDataFiles; import org.apache.iceberg.actions.RewriteDataFilesCommitManager; import org.apache.iceberg.actions.RewriteFileGroup; @@ -157,7 +155,7 @@ public RewriteDataFilesSparkAction filter(Expression expression) { @Override public RewriteDataFiles.Result execute() { if (table.currentSnapshot() == null) { - return new BaseRewriteDataFilesResult(ImmutableList.of()); + return ImmutableRewriteDataFiles.Result.builder().build(); } long startingSnapshotId = table.currentSnapshot().snapshotId(); @@ -175,7 +173,7 @@ public RewriteDataFiles.Result execute() { if (ctx.totalGroupCount() == 0) { LOG.info("Nothing found to rewrite in {}", table.name()); - return new BaseRewriteDataFilesResult(Collections.emptyList()); + return ImmutableRewriteDataFiles.Result.builder().build(); } Stream groupStream = toGroupStream(ctx, fileGroupsByPartition); @@ -334,7 +332,7 @@ private Result doExecute( List rewriteResults = rewrittenGroups.stream().map(RewriteFileGroup::asResult).collect(Collectors.toList()); - return new BaseRewriteDataFilesResult(rewriteResults); + return ImmutableRewriteDataFiles.Result.builder().rewriteResults(rewriteResults).build(); } private Result doExecuteWithPartialProgress( @@ -374,7 +372,7 @@ private Result doExecuteWithPartialProgress( List rewriteResults = commitResults.stream().map(RewriteFileGroup::asResult).collect(Collectors.toList()); - return new BaseRewriteDataFilesResult(rewriteResults); + return ImmutableRewriteDataFiles.Result.builder().rewriteResults(rewriteResults).build(); } Stream toGroupStream( @@ -392,8 +390,11 @@ Stream toGroupStream( int globalIndex = ctx.currentGlobalIndex(); int partitionIndex = ctx.currentPartitionIndex(partition); FileGroupInfo info = - new BaseRewriteDataFilesFileGroupInfo( - globalIndex, partitionIndex, partition); + ImmutableRewriteDataFiles.FileGroupInfo.builder() + .globalIndex(globalIndex) + .partitionIndex(partitionIndex) + .partition(partition) + .build(); return new RewriteFileGroup(info, tasks); }); }); diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java index 860168ae0a4e..5efc70a4fc48 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java @@ -41,7 +41,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableOperations; import org.apache.iceberg.TableProperties; -import org.apache.iceberg.actions.BaseRewriteManifestsActionResult; +import org.apache.iceberg.actions.ImmutableRewriteManifests; import org.apache.iceberg.actions.RewriteManifests; import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.exceptions.ValidationException; @@ -151,7 +151,7 @@ public RewriteManifests.Result execute() { private RewriteManifests.Result doExecute() { List matchingManifests = findMatchingManifests(); if (matchingManifests.isEmpty()) { - return BaseRewriteManifestsActionResult.empty(); + return ImmutableRewriteManifests.Result.builder().build(); } long totalSizeBytes = 0L; @@ -170,7 +170,7 @@ private RewriteManifests.Result doExecute() { int targetNumManifestEntries = targetNumManifestEntries(numEntries, targetNumManifests); if (targetNumManifests == 1 && matchingManifests.size() == 1) { - return BaseRewriteManifestsActionResult.empty(); + return ImmutableRewriteManifests.Result.builder().build(); } Dataset manifestEntryDF = buildManifestEntryDF(matchingManifests); @@ -186,7 +186,10 @@ private RewriteManifests.Result doExecute() { replaceManifests(matchingManifests, newManifests); - return new BaseRewriteManifestsActionResult(matchingManifests, newManifests); + return ImmutableRewriteManifests.Result.builder() + .addedManifests(newManifests) + .rewrittenManifests(matchingManifests) + .build(); } private Dataset buildManifestEntryDF(List manifests) { diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SnapshotTableSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SnapshotTableSparkAction.java index 289e408b8960..8e59c13543f8 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SnapshotTableSparkAction.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SnapshotTableSparkAction.java @@ -23,7 +23,7 @@ import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; -import org.apache.iceberg.actions.BaseSnapshotTableActionResult; +import org.apache.iceberg.actions.ImmutableSnapshotTable; import org.apache.iceberg.actions.SnapshotTable; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -150,7 +150,9 @@ private SnapshotTable.Result doExecute() { "Successfully loaded Iceberg metadata for {} files to {}", importedDataFilesCount, destTableIdent()); - return new BaseSnapshotTableActionResult(importedDataFilesCount); + return ImmutableSnapshotTable.Result.builder() + .importedDataFilesCount(importedDataFilesCount) + .build(); } @Override diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java deleted file mode 100644 index 07d3210ead66..000000000000 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.spark.actions; - -import java.util.List; -import java.util.Set; -import java.util.UUID; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.DistributionMode; -import org.apache.iceberg.FileScanTask; -import org.apache.iceberg.Table; -import org.apache.iceberg.actions.BinPackStrategy; -import org.apache.iceberg.spark.FileRewriteCoordinator; -import org.apache.iceberg.spark.ScanTaskSetManager; -import org.apache.iceberg.spark.SparkReadOptions; -import org.apache.iceberg.spark.SparkTableCache; -import org.apache.iceberg.spark.SparkWriteOptions; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SparkSession; - -/** - * A Spark strategy to bin-pack data. - * - * @deprecated since 1.3.0, will be removed in 1.4.0; use {@link SparkBinPackDataRewriter} instead. - */ -@Deprecated -public class SparkBinPackStrategy extends BinPackStrategy { - private final Table table; - private final SparkSession spark; - private final SparkTableCache tableCache = SparkTableCache.get(); - private final ScanTaskSetManager manager = ScanTaskSetManager.get(); - private final FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get(); - - public SparkBinPackStrategy(Table table, SparkSession spark) { - this.table = table; - this.spark = spark; - } - - @Override - public Table table() { - return table; - } - - @Override - public Set rewriteFiles(List filesToRewrite) { - String groupID = UUID.randomUUID().toString(); - try { - tableCache.add(groupID, table); - manager.stageTasks(table, groupID, filesToRewrite); - - Dataset scanDF = - spark - .read() - .format("iceberg") - .option(SparkReadOptions.SCAN_TASK_SET_ID, groupID) - .option(SparkReadOptions.SPLIT_SIZE, splitSize(inputFileSize(filesToRewrite))) - .option(SparkReadOptions.FILE_OPEN_COST, "0") - .load(groupID); - - // All files within a file group are written with the same spec, so check the first - boolean requiresRepartition = !filesToRewrite.get(0).spec().equals(table.spec()); - - // Invoke a shuffle if the partition spec of the incoming partition does not match the table - String distributionMode = - requiresRepartition - ? DistributionMode.RANGE.modeName() - : DistributionMode.NONE.modeName(); - - // write the packed data into new files where each split becomes a new file - scanDF - .write() - .format("iceberg") - .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupID) - .option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize()) - .option(SparkWriteOptions.DISTRIBUTION_MODE, distributionMode) - .mode("append") - .save(groupID); - - return rewriteCoordinator.fetchNewDataFiles(table, groupID); - } finally { - tableCache.remove(groupID); - manager.removeTasks(table, groupID); - rewriteCoordinator.clearRewrite(table, groupID); - } - } -} diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortStrategy.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortStrategy.java deleted file mode 100644 index 21e29263c925..000000000000 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortStrategy.java +++ /dev/null @@ -1,187 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.spark.actions; - -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.FileScanTask; -import org.apache.iceberg.Table; -import org.apache.iceberg.actions.RewriteStrategy; -import org.apache.iceberg.actions.SortStrategy; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; -import org.apache.iceberg.spark.FileRewriteCoordinator; -import org.apache.iceberg.spark.ScanTaskSetManager; -import org.apache.iceberg.spark.SparkDistributionAndOrderingUtil; -import org.apache.iceberg.spark.SparkReadOptions; -import org.apache.iceberg.spark.SparkTableCache; -import org.apache.iceberg.spark.SparkWriteOptions; -import org.apache.iceberg.util.PropertyUtil; -import org.apache.iceberg.util.SortOrderUtil; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; -import org.apache.spark.sql.catalyst.utils.DistributionAndOrderingUtils$; -import org.apache.spark.sql.connector.distributions.Distribution; -import org.apache.spark.sql.connector.distributions.Distributions; -import org.apache.spark.sql.connector.expressions.SortOrder; -import org.apache.spark.sql.internal.SQLConf; - -/** - * A Spark strategy to sort data. - * - * @deprecated since 1.3.0, will be removed in 1.4.0; use {@link SparkSortDataRewriter} instead. - */ -@Deprecated -public class SparkSortStrategy extends SortStrategy { - - /** - * The number of shuffle partitions and consequently the number of output files created by the - * Spark Sort is based on the size of the input data files used in this rewrite operation. Due to - * compression, the disk file sizes may not accurately represent the size of files in the output. - * This parameter lets the user adjust the file size used for estimating actual output data size. - * A factor greater than 1.0 would generate more files than we would expect based on the on-disk - * file size. A value less than 1.0 would create fewer files than we would expect due to the - * on-disk size. - */ - public static final String COMPRESSION_FACTOR = "compression-factor"; - - private final Table table; - private final SparkSession spark; - private final SparkTableCache tableCache = SparkTableCache.get(); - private final ScanTaskSetManager manager = ScanTaskSetManager.get(); - private final FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get(); - - private double sizeEstimateMultiple; - - public SparkSortStrategy(Table table, SparkSession spark) { - this.table = table; - this.spark = spark; - } - - @Override - public Table table() { - return table; - } - - @Override - public Set validOptions() { - return ImmutableSet.builder() - .addAll(super.validOptions()) - .add(COMPRESSION_FACTOR) - .build(); - } - - @Override - public RewriteStrategy options(Map options) { - sizeEstimateMultiple = PropertyUtil.propertyAsDouble(options, COMPRESSION_FACTOR, 1.0); - - Preconditions.checkArgument( - sizeEstimateMultiple > 0, - "Invalid compression factor: %s (not positive)", - sizeEstimateMultiple); - - return super.options(options); - } - - @Override - public Set rewriteFiles(List filesToRewrite) { - String groupID = UUID.randomUUID().toString(); - boolean requiresRepartition = !filesToRewrite.get(0).spec().equals(table.spec()); - - SortOrder[] ordering; - if (requiresRepartition) { - // Build in the requirement for Partition Sorting into our sort order - ordering = - SparkDistributionAndOrderingUtil.convert( - SortOrderUtil.buildSortOrder(table, sortOrder())); - } else { - ordering = SparkDistributionAndOrderingUtil.convert(sortOrder()); - } - - Distribution distribution = Distributions.ordered(ordering); - - try { - tableCache.add(groupID, table); - manager.stageTasks(table, groupID, filesToRewrite); - - // Reset Shuffle Partitions for our sort - long numOutputFiles = - numOutputFiles((long) (inputFileSize(filesToRewrite) * sizeEstimateMultiple)); - spark.conf().set(SQLConf.SHUFFLE_PARTITIONS().key(), Math.max(1, numOutputFiles)); - - Dataset scanDF = - spark - .read() - .format("iceberg") - .option(SparkReadOptions.SCAN_TASK_SET_ID, groupID) - .load(groupID); - - // write the packed data into new files where each split becomes a new file - SQLConf sqlConf = spark.sessionState().conf(); - LogicalPlan sortPlan = sortPlan(distribution, ordering, scanDF.logicalPlan(), sqlConf); - Dataset sortedDf = new Dataset<>(spark, sortPlan, scanDF.encoder()); - - sortedDf - .write() - .format("iceberg") - .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupID) - .option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize()) - .option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "false") - .mode("append") // This will only write files without modifying the table, see - // SparkWrite.RewriteFiles - .save(groupID); - - return rewriteCoordinator.fetchNewDataFiles(table, groupID); - } finally { - tableCache.remove(groupID); - manager.removeTasks(table, groupID); - rewriteCoordinator.clearRewrite(table, groupID); - } - } - - protected SparkSession spark() { - return this.spark; - } - - protected LogicalPlan sortPlan( - Distribution distribution, SortOrder[] ordering, LogicalPlan plan, SQLConf conf) { - return DistributionAndOrderingUtils$.MODULE$.prepareQuery(distribution, ordering, plan, conf); - } - - protected double sizeEstimateMultiple() { - return sizeEstimateMultiple; - } - - protected SparkTableCache tableCache() { - return tableCache; - } - - protected ScanTaskSetManager manager() { - return manager; - } - - protected FileRewriteCoordinator rewriteCoordinator() { - return rewriteCoordinator; - } -} diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java deleted file mode 100644 index 26d2b4837b4b..000000000000 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java +++ /dev/null @@ -1,268 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.spark.actions; - -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.stream.Collectors; -import java.util.stream.Stream; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.FileScanTask; -import org.apache.iceberg.NullOrder; -import org.apache.iceberg.PartitionField; -import org.apache.iceberg.Schema; -import org.apache.iceberg.SortDirection; -import org.apache.iceberg.Table; -import org.apache.iceberg.actions.RewriteStrategy; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; -import org.apache.iceberg.spark.SparkDistributionAndOrderingUtil; -import org.apache.iceberg.spark.SparkReadOptions; -import org.apache.iceberg.spark.SparkUtil; -import org.apache.iceberg.spark.SparkWriteOptions; -import org.apache.iceberg.types.Types; -import org.apache.iceberg.types.Types.NestedField; -import org.apache.iceberg.util.PropertyUtil; -import org.apache.iceberg.util.SortOrderUtil; -import org.apache.iceberg.util.ZOrderByteUtils; -import org.apache.spark.sql.Column; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; -import org.apache.spark.sql.connector.distributions.Distribution; -import org.apache.spark.sql.connector.distributions.Distributions; -import org.apache.spark.sql.connector.expressions.SortOrder; -import org.apache.spark.sql.functions; -import org.apache.spark.sql.internal.SQLConf; -import org.apache.spark.sql.types.StructField; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A Spark strategy to zOrder data. - * - * @deprecated since 1.3.0, will be removed in 1.4.0; use {@link SparkZOrderDataRewriter} instead. - */ -@Deprecated -public class SparkZOrderStrategy extends SparkSortStrategy { - private static final Logger LOG = LoggerFactory.getLogger(SparkZOrderStrategy.class); - - private static final String Z_COLUMN = "ICEZVALUE"; - private static final Schema Z_SCHEMA = - new Schema(NestedField.required(0, Z_COLUMN, Types.BinaryType.get())); - private static final org.apache.iceberg.SortOrder Z_SORT_ORDER = - org.apache.iceberg.SortOrder.builderFor(Z_SCHEMA) - .sortBy(Z_COLUMN, SortDirection.ASC, NullOrder.NULLS_LAST) - .build(); - - /** - * Controls the amount of bytes interleaved in the ZOrder Algorithm. Default is all bytes being - * interleaved. - */ - private static final String MAX_OUTPUT_SIZE_KEY = "max-output-size"; - - private static final int DEFAULT_MAX_OUTPUT_SIZE = Integer.MAX_VALUE; - - /** - * Controls the number of bytes considered from an input column of a type with variable length - * (String, Binary). Default is to use the same size as primitives {@link - * ZOrderByteUtils#PRIMITIVE_BUFFER_SIZE} - */ - private static final String VAR_LENGTH_CONTRIBUTION_KEY = "var-length-contribution"; - - private static final int DEFAULT_VAR_LENGTH_CONTRIBUTION = ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE; - - private final List zOrderColNames; - - private int maxOutputSize; - private int varLengthContribution; - - @Override - public Set validOptions() { - return ImmutableSet.builder() - .addAll(super.validOptions()) - .add(VAR_LENGTH_CONTRIBUTION_KEY) - .add(MAX_OUTPUT_SIZE_KEY) - .build(); - } - - @Override - public RewriteStrategy options(Map options) { - super.options(options); - - varLengthContribution = - PropertyUtil.propertyAsInt( - options, VAR_LENGTH_CONTRIBUTION_KEY, DEFAULT_VAR_LENGTH_CONTRIBUTION); - Preconditions.checkArgument( - varLengthContribution > 0, - "Cannot use less than 1 byte for variable length types with zOrder, %s was set to %s", - VAR_LENGTH_CONTRIBUTION_KEY, - varLengthContribution); - - maxOutputSize = - PropertyUtil.propertyAsInt(options, MAX_OUTPUT_SIZE_KEY, DEFAULT_MAX_OUTPUT_SIZE); - Preconditions.checkArgument( - maxOutputSize > 0, - "Cannot have the interleaved ZOrder value use less than 1 byte, %s was set to %s", - MAX_OUTPUT_SIZE_KEY, - maxOutputSize); - - return this; - } - - public SparkZOrderStrategy(Table table, SparkSession spark, List zOrderColNames) { - super(table, spark); - - Preconditions.checkArgument( - zOrderColNames != null && !zOrderColNames.isEmpty(), - "Cannot ZOrder when no columns are specified"); - - Stream identityPartitionColumns = - table.spec().fields().stream() - .filter(f -> f.transform().isIdentity()) - .map(PartitionField::name); - List partZOrderCols = - identityPartitionColumns.filter(zOrderColNames::contains).collect(Collectors.toList()); - - if (!partZOrderCols.isEmpty()) { - LOG.warn( - "Cannot ZOrder on an Identity partition column as these values are constant within a partition " - + "and will be removed from the ZOrder expression: {}", - partZOrderCols); - zOrderColNames.removeAll(partZOrderCols); - Preconditions.checkArgument( - !zOrderColNames.isEmpty(), - "Cannot perform ZOrdering, all columns provided were identity partition columns and cannot be used."); - } - - validateColumnsExistence(table, spark, zOrderColNames); - - this.zOrderColNames = zOrderColNames; - } - - private void validateColumnsExistence(Table table, SparkSession spark, List colNames) { - boolean caseSensitive = SparkUtil.caseSensitive(spark); - Schema schema = table.schema(); - colNames.forEach( - col -> { - NestedField nestedField = - caseSensitive ? schema.findField(col) : schema.caseInsensitiveFindField(col); - if (nestedField == null) { - throw new IllegalArgumentException( - String.format( - "Cannot find column '%s' in table schema: %s", col, schema.asStruct())); - } - }); - } - - @Override - public String name() { - return "Z-ORDER"; - } - - @Override - protected void validateOptions() { - // Ignore SortStrategy validation - return; - } - - @Override - public Set rewriteFiles(List filesToRewrite) { - SparkZOrderUDF zOrderUDF = - new SparkZOrderUDF(zOrderColNames.size(), varLengthContribution, maxOutputSize); - - String groupID = UUID.randomUUID().toString(); - boolean requiresRepartition = !filesToRewrite.get(0).spec().equals(table().spec()); - - SortOrder[] ordering; - if (requiresRepartition) { - ordering = - SparkDistributionAndOrderingUtil.convert( - SortOrderUtil.buildSortOrder(table(), sortOrder())); - } else { - ordering = SparkDistributionAndOrderingUtil.convert(sortOrder()); - } - - Distribution distribution = Distributions.ordered(ordering); - - try { - tableCache().add(groupID, table()); - manager().stageTasks(table(), groupID, filesToRewrite); - - // spark session from parent - SparkSession spark = spark(); - // Reset Shuffle Partitions for our sort - long numOutputFiles = - numOutputFiles((long) (inputFileSize(filesToRewrite) * sizeEstimateMultiple())); - spark.conf().set(SQLConf.SHUFFLE_PARTITIONS().key(), Math.max(1, numOutputFiles)); - - Dataset scanDF = - spark - .read() - .format("iceberg") - .option(SparkReadOptions.SCAN_TASK_SET_ID, groupID) - .load(groupID); - - Column[] originalColumns = - Arrays.stream(scanDF.schema().names()).map(n -> functions.col(n)).toArray(Column[]::new); - - List zOrderColumns = - zOrderColNames.stream().map(scanDF.schema()::apply).collect(Collectors.toList()); - - Column zvalueArray = - functions.array( - zOrderColumns.stream() - .map( - colStruct -> - zOrderUDF.sortedLexicographically( - functions.col(colStruct.name()), colStruct.dataType())) - .toArray(Column[]::new)); - - Dataset zvalueDF = scanDF.withColumn(Z_COLUMN, zOrderUDF.interleaveBytes(zvalueArray)); - - SQLConf sqlConf = spark.sessionState().conf(); - LogicalPlan sortPlan = sortPlan(distribution, ordering, zvalueDF.logicalPlan(), sqlConf); - Dataset sortedDf = new Dataset<>(spark, sortPlan, zvalueDF.encoder()); - sortedDf - .select(originalColumns) - .write() - .format("iceberg") - .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupID) - .option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize()) - .option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "false") - .mode("append") - .save(groupID); - - return rewriteCoordinator().fetchNewDataFiles(table(), groupID); - } finally { - tableCache().remove(groupID); - manager().removeTasks(table(), groupID); - rewriteCoordinator().clearRewrite(table(), groupID); - } - } - - @Override - protected org.apache.iceberg.SortOrder sortOrder() { - return Z_SORT_ORDER; - } -} diff --git a/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java b/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java index eaef8e0bccaa..b08c35281905 100644 --- a/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java +++ b/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java @@ -35,7 +35,7 @@ import org.apache.iceberg.SortDirection; import org.apache.iceberg.SortOrder; import org.apache.iceberg.Table; -import org.apache.iceberg.actions.BinPackStrategy; +import org.apache.iceberg.actions.SizeBasedFileRewriter; import org.apache.iceberg.relocated.com.google.common.io.Files; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkSchemaUtil; @@ -105,7 +105,7 @@ public void cleanUpIteration() throws IOException { public void sortInt() { SparkActions.get() .rewriteDataFiles(table()) - .option(BinPackStrategy.REWRITE_ALL, "true") + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") .sort( SortOrder.builderFor(table().schema()) .sortBy("intCol", SortDirection.ASC, NullOrder.NULLS_FIRST) @@ -118,7 +118,7 @@ public void sortInt() { public void sortInt2() { SparkActions.get() .rewriteDataFiles(table()) - .option(BinPackStrategy.REWRITE_ALL, "true") + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") .sort( SortOrder.builderFor(table().schema()) .sortBy("intCol", SortDirection.ASC, NullOrder.NULLS_FIRST) @@ -132,7 +132,7 @@ public void sortInt2() { public void sortInt3() { SparkActions.get() .rewriteDataFiles(table()) - .option(BinPackStrategy.REWRITE_ALL, "true") + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") .sort( SortOrder.builderFor(table().schema()) .sortBy("intCol", SortDirection.ASC, NullOrder.NULLS_FIRST) @@ -148,7 +148,7 @@ public void sortInt3() { public void sortInt4() { SparkActions.get() .rewriteDataFiles(table()) - .option(BinPackStrategy.REWRITE_ALL, "true") + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") .sort( SortOrder.builderFor(table().schema()) .sortBy("intCol", SortDirection.ASC, NullOrder.NULLS_FIRST) @@ -164,7 +164,7 @@ public void sortInt4() { public void sortString() { SparkActions.get() .rewriteDataFiles(table()) - .option(BinPackStrategy.REWRITE_ALL, "true") + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") .sort( SortOrder.builderFor(table().schema()) .sortBy("stringCol", SortDirection.ASC, NullOrder.NULLS_FIRST) @@ -177,7 +177,7 @@ public void sortString() { public void sortFourColumns() { SparkActions.get() .rewriteDataFiles(table()) - .option(BinPackStrategy.REWRITE_ALL, "true") + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") .sort( SortOrder.builderFor(table().schema()) .sortBy("stringCol", SortDirection.ASC, NullOrder.NULLS_FIRST) @@ -193,7 +193,7 @@ public void sortFourColumns() { public void sortSixColumns() { SparkActions.get() .rewriteDataFiles(table()) - .option(BinPackStrategy.REWRITE_ALL, "true") + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") .sort( SortOrder.builderFor(table().schema()) .sortBy("stringCol", SortDirection.ASC, NullOrder.NULLS_FIRST) @@ -211,7 +211,7 @@ public void sortSixColumns() { public void zSortInt() { SparkActions.get() .rewriteDataFiles(table()) - .option(BinPackStrategy.REWRITE_ALL, "true") + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") .zOrder("intCol") .execute(); } @@ -221,7 +221,7 @@ public void zSortInt() { public void zSortInt2() { SparkActions.get() .rewriteDataFiles(table()) - .option(BinPackStrategy.REWRITE_ALL, "true") + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") .zOrder("intCol", "intCol2") .execute(); } @@ -231,7 +231,7 @@ public void zSortInt2() { public void zSortInt3() { SparkActions.get() .rewriteDataFiles(table()) - .option(BinPackStrategy.REWRITE_ALL, "true") + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") .zOrder("intCol", "intCol2", "intCol3") .execute(); } @@ -241,7 +241,7 @@ public void zSortInt3() { public void zSortInt4() { SparkActions.get() .rewriteDataFiles(table()) - .option(BinPackStrategy.REWRITE_ALL, "true") + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") .zOrder("intCol", "intCol2", "intCol3", "intCol4") .execute(); } @@ -251,7 +251,7 @@ public void zSortInt4() { public void zSortString() { SparkActions.get() .rewriteDataFiles(table()) - .option(BinPackStrategy.REWRITE_ALL, "true") + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") .zOrder("stringCol") .execute(); } @@ -261,7 +261,7 @@ public void zSortString() { public void zSortFourColumns() { SparkActions.get() .rewriteDataFiles(table()) - .option(BinPackStrategy.REWRITE_ALL, "true") + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") .zOrder("stringCol", "intCol", "dateCol", "doubleCol") .execute(); } @@ -271,7 +271,7 @@ public void zSortFourColumns() { public void zSortSixColumns() { SparkActions.get() .rewriteDataFiles(table()) - .option(BinPackStrategy.REWRITE_ALL, "true") + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") .zOrder("stringCol", "intCol", "dateCol", "timestampCol", "doubleCol", "longCol") .execute(); } diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index 0cb910364394..35741b74bcac 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -62,12 +62,12 @@ import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; -import org.apache.iceberg.actions.BinPackStrategy; import org.apache.iceberg.actions.RewriteDataFiles; import org.apache.iceberg.actions.RewriteDataFiles.Result; import org.apache.iceberg.actions.RewriteDataFilesCommitManager; import org.apache.iceberg.actions.RewriteFileGroup; -import org.apache.iceberg.actions.SortStrategy; +import org.apache.iceberg.actions.SizeBasedDataRewriter; +import org.apache.iceberg.actions.SizeBasedFileRewriter; import org.apache.iceberg.data.GenericAppenderFactory; import org.apache.iceberg.data.Record; import org.apache.iceberg.deletes.PositionDelete; @@ -146,7 +146,7 @@ public void setupTableLocation() throws Exception { private RewriteDataFilesSparkAction basicRewrite(Table table) { // Always compact regardless of input files table.refresh(); - return actions().rewriteDataFiles(table).option(BinPackStrategy.MIN_INPUT_FILES, "1"); + return actions().rewriteDataFiles(table).option(SizeBasedFileRewriter.MIN_INPUT_FILES, "1"); } @Test @@ -234,9 +234,10 @@ public void testBinPackAfterPartitionChange() { RewriteDataFiles.Result result = basicRewrite(table) - .option(SortStrategy.MIN_INPUT_FILES, "1") + .option(SizeBasedFileRewriter.MIN_INPUT_FILES, "1") .option( - SortStrategy.MIN_FILE_SIZE_BYTES, Integer.toString(averageFileSize(table) + 1000)) + SizeBasedFileRewriter.MIN_FILE_SIZE_BYTES, + Integer.toString(averageFileSize(table) + 1000)) .option( RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Integer.toString(averageFileSize(table) + 1001)) @@ -288,10 +289,10 @@ public void testBinPackWithDeletes() throws Exception { actions() .rewriteDataFiles(table) // do not include any file based on bin pack file size configs - .option(BinPackStrategy.MIN_FILE_SIZE_BYTES, "0") + .option(SizeBasedFileRewriter.MIN_FILE_SIZE_BYTES, "0") .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Long.toString(Long.MAX_VALUE - 1)) - .option(BinPackStrategy.MAX_FILE_SIZE_BYTES, Long.toString(Long.MAX_VALUE)) - .option(BinPackStrategy.DELETE_FILE_THRESHOLD, "2") + .option(SizeBasedFileRewriter.MAX_FILE_SIZE_BYTES, Long.toString(Long.MAX_VALUE)) + .option(SizeBasedDataRewriter.DELETE_FILE_THRESHOLD, "2") .execute(); Assert.assertEquals("Action should rewrite 2 data files", 2, result.rewrittenDataFilesCount()); assertThat(result.rewrittenBytesCount()).isGreaterThan(0L).isLessThan(dataSizeBefore); @@ -326,7 +327,7 @@ public void testBinPackWithDeleteAllData() { Result result = actions() .rewriteDataFiles(table) - .option(BinPackStrategy.DELETE_FILE_THRESHOLD, "1") + .option(SizeBasedDataRewriter.DELETE_FILE_THRESHOLD, "1") .execute(); Assert.assertEquals("Action should rewrite 1 data files", 1, result.rewrittenDataFilesCount()); assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore); @@ -464,7 +465,7 @@ public void testBinPackSplitLargeFile() { Result result = basicRewrite(table) .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Long.toString(targetSize)) - .option(BinPackStrategy.MAX_FILE_SIZE_BYTES, Long.toString(targetSize * 2 - 2000)) + .option(SizeBasedFileRewriter.MAX_FILE_SIZE_BYTES, Long.toString(targetSize * 2 - 2000)) .execute(); Assert.assertEquals("Action should delete 1 data files", 1, result.rewrittenDataFilesCount()); @@ -495,8 +496,8 @@ public void testBinPackCombineMixedFiles() { Result result = basicRewrite(table) .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Integer.toString(targetSize + 1000)) - .option(BinPackStrategy.MAX_FILE_SIZE_BYTES, Integer.toString(targetSize + 80000)) - .option(BinPackStrategy.MIN_FILE_SIZE_BYTES, Integer.toString(targetSize - 1000)) + .option(SizeBasedFileRewriter.MAX_FILE_SIZE_BYTES, Integer.toString(targetSize + 80000)) + .option(SizeBasedFileRewriter.MIN_FILE_SIZE_BYTES, Integer.toString(targetSize - 1000)) .execute(); Assert.assertEquals("Action should delete 3 data files", 3, result.rewrittenDataFilesCount()); @@ -524,9 +525,11 @@ public void testBinPackCombineMediumFiles() { Result result = basicRewrite(table) .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Integer.toString(targetSize)) - .option(BinPackStrategy.MAX_FILE_SIZE_BYTES, Integer.toString((int) (targetSize * 1.8))) .option( - BinPackStrategy.MIN_FILE_SIZE_BYTES, + SizeBasedFileRewriter.MAX_FILE_SIZE_BYTES, + Integer.toString((int) (targetSize * 1.8))) + .option( + SizeBasedFileRewriter.MIN_FILE_SIZE_BYTES, Integer.toString(targetSize - 100)) // All files too small .execute(); @@ -584,7 +587,7 @@ public void testMultipleGroups() { basicRewrite(table) .option( RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES, Integer.toString(fileSize * 2 + 1000)) - .option(BinPackStrategy.MIN_INPUT_FILES, "1") + .option(SizeBasedFileRewriter.MIN_INPUT_FILES, "1") .execute(); Assert.assertEquals("Should have 10 fileGroups", result.rewriteResults().size(), 10); @@ -920,7 +923,7 @@ public void testSortMultipleGroups() { RewriteDataFiles.Result result = basicRewrite(table) .sort() - .option(SortStrategy.REWRITE_ALL, "true") + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") .option( RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES, Integer.toString(fileSize * 2 + 1000)) .execute(); @@ -950,8 +953,8 @@ public void testSimpleSort() { RewriteDataFiles.Result result = basicRewrite(table) .sort() - .option(SortStrategy.MIN_INPUT_FILES, "1") - .option(SortStrategy.REWRITE_ALL, "true") + .option(SizeBasedFileRewriter.MIN_INPUT_FILES, "1") + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") .option( RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Integer.toString(averageFileSize(table))) .execute(); @@ -984,8 +987,8 @@ public void testSortAfterPartitionChange() { RewriteDataFiles.Result result = basicRewrite(table) .sort() - .option(SortStrategy.MIN_INPUT_FILES, "1") - .option(SortStrategy.REWRITE_ALL, "true") + .option(SizeBasedFileRewriter.MIN_INPUT_FILES, "1") + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") .option( RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Integer.toString(averageFileSize(table))) .execute(); @@ -1019,7 +1022,7 @@ public void testSortCustomSortOrder() { RewriteDataFiles.Result result = basicRewrite(table) .sort(SortOrder.builderFor(table.schema()).asc("c2").build()) - .option(SortStrategy.REWRITE_ALL, "true") + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") .option( RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Integer.toString(averageFileSize(table))) .execute(); @@ -1057,7 +1060,7 @@ public void testSortCustomSortOrderRequiresRepartition() { RewriteDataFiles.Result result = basicRewrite(table) .sort(SortOrder.builderFor(table.schema()).asc("c3").build()) - .option(SortStrategy.REWRITE_ALL, "true") + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") .option( RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Integer.toString(averageFileSize(table) / partitions)) @@ -1091,13 +1094,13 @@ public void testAutoSortShuffleOutput() { basicRewrite(table) .sort(SortOrder.builderFor(table.schema()).asc("c2").build()) .option( - SortStrategy.MAX_FILE_SIZE_BYTES, + SizeBasedFileRewriter.MAX_FILE_SIZE_BYTES, Integer.toString((averageFileSize(table) / 2) + 2)) // Divide files in 2 .option( RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Integer.toString(averageFileSize(table) / 2)) - .option(SortStrategy.MIN_INPUT_FILES, "1") + .option(SizeBasedFileRewriter.MIN_INPUT_FILES, "1") .execute(); Assert.assertEquals("Should have 1 fileGroups", result.rewriteResults().size(), 1); @@ -1170,13 +1173,13 @@ public void testZOrderSort() { basicRewrite(table) .zOrder("c2", "c3") .option( - SortStrategy.MAX_FILE_SIZE_BYTES, + SizeBasedFileRewriter.MAX_FILE_SIZE_BYTES, Integer.toString((averageFileSize(table) / 2) + 2)) // Divide files in 2 .option( RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Integer.toString(averageFileSize(table) / 2)) - .option(SortStrategy.MIN_INPUT_FILES, "1") + .option(SizeBasedFileRewriter.MIN_INPUT_FILES, "1") .execute(); Assert.assertEquals("Should have 1 fileGroups", 1, result.rewriteResults().size()); @@ -1231,8 +1234,8 @@ public void testZOrderAllTypesSort() { "stringCol", "binaryCol", "booleanCol") - .option(SortStrategy.MIN_INPUT_FILES, "1") - .option(SortStrategy.REWRITE_ALL, "true") + .option(SizeBasedFileRewriter.MIN_INPUT_FILES, "1") + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") .execute(); Assert.assertEquals("Should have 1 fileGroups", 1, result.rewriteResults().size());