Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -612,9 +612,10 @@ private void commit(SnapshotUpdate<?> update) {
*/
private void commitCompaction(Table table, Long snapshotId, long startTime, FilesForCommit results,
String partitionPath, long fileSizeThreshold) {
List<DataFile> existingDataFiles = IcebergCompactionUtil.getDataFiles(table, partitionPath, fileSizeThreshold);
List<DataFile> existingDataFiles =
IcebergCompactionUtil.getDataFiles(table, snapshotId, partitionPath, fileSizeThreshold);
List<DeleteFile> existingDeleteFiles = fileSizeThreshold == -1 ?
IcebergCompactionUtil.getDeleteFiles(table, partitionPath) : Collections.emptyList();
IcebergCompactionUtil.getDeleteFiles(table, snapshotId, partitionPath) : Collections.emptyList();
Copy link
Member

@deniskuzZ deniskuzZ Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add test.
as an example you could use TestConflictingDataFiles#testConflictingUpdateAndDelete

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Working on it.


RewriteFiles rewriteFiles = table.newRewrite();
existingDataFiles.forEach(rewriteFiles::deleteFile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.MetadataTableUtils;
import org.apache.iceberg.PositionDeletesScanTask;
Expand Down Expand Up @@ -69,15 +68,16 @@ public static boolean shouldIncludeForCompaction(Table table, String partitionPa
* @param table the iceberg table
* @param partitionPath partition path
*/
public static List<DataFile> getDataFiles(Table table, String partitionPath, long fileSizeThreshold) {
CloseableIterable<FileScanTask> fileScanTasks =
table.newScan().useSnapshot(table.currentSnapshot().snapshotId()).ignoreResiduals().planFiles();
CloseableIterable<FileScanTask> filteredFileScanTasks =
CloseableIterable.filter(fileScanTasks, t -> {
public static List<DataFile> getDataFiles(Table table, Long snapshotId, String partitionPath,
long fileSizeThreshold) {
CloseableIterable<ScanTask> scanTasks =
table.newBatchScan().useSnapshot(snapshotId).planFiles();
CloseableIterable<ScanTask> filteredScanTasks =
CloseableIterable.filter(scanTasks, t -> {
DataFile file = t.asFileScanTask().file();
return shouldIncludeForCompaction(table, partitionPath, file, fileSizeThreshold);
});
return Lists.newArrayList(CloseableIterable.transform(filteredFileScanTasks, t -> t.file()));
return Lists.newArrayList(CloseableIterable.transform(filteredScanTasks, t -> t.asFileScanTask().file()));
}

/**
Expand All @@ -88,10 +88,10 @@ public static List<DataFile> getDataFiles(Table table, String partitionPath, lon
* @param table the iceberg table
* @param partitionPath partition path
*/
public static List<DeleteFile> getDeleteFiles(Table table, String partitionPath) {
public static List<DeleteFile> getDeleteFiles(Table table, Long snapshotId, String partitionPath) {
Table deletesTable =
MetadataTableUtils.createMetadataTableInstance(table, MetadataTableType.POSITION_DELETES);
CloseableIterable<ScanTask> deletesScanTasks = deletesTable.newBatchScan().planFiles();
CloseableIterable<ScanTask> deletesScanTasks = deletesTable.newBatchScan().useSnapshot(snapshotId).planFiles();
CloseableIterable<ScanTask> filteredDeletesScanTasks =
CloseableIterable.filter(deletesScanTasks, t -> {
DeleteFile file = ((PositionDeletesScanTask) t).file();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ public Set<CompactionInfo> findPotentialCompactions(long lastChecked, ShowCompac
partitions.forEach(partition -> addCompactionTargetIfEligible(table, icebergTable,
partition.getName(), compactionTargets, currentCompactions, skipDBs, skipTables));

if (IcebergTableUtil.hasUndergonePartitionEvolution(icebergTable) && !findModifiedPartitions(hiveTable,
icebergTable, snapshotTimeMilCache.get(qualifiedTableName), false).isEmpty()) {
if (IcebergTableUtil.hasUndergonePartitionEvolution(icebergTable) && hasModifiedPartitions(icebergTable,
snapshotTimeMilCache.get(qualifiedTableName), false)) {
addCompactionTargetIfEligible(table, icebergTable,
null, compactionTargets, currentCompactions, skipDBs, skipTables);
}
Expand Down Expand Up @@ -183,18 +183,8 @@ private List<Partition> findModifiedPartitions(org.apache.hadoop.hive.ql.metadat

try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
// Submit a task for each snapshot and collect the Futures
List<Future<Set<String>>> futures = relevantSnapshots.stream()
.map(snapshot -> executor.submit(() -> {
FileIO io = icebergTable.io();
List<ContentFile<?>> affectedFiles = FluentIterable.<ContentFile<?>>concat(
snapshot.addedDataFiles(io),
snapshot.removedDataFiles(io),
snapshot.addedDeleteFiles(io),
snapshot.removedDeleteFiles(io))
.toList();
return IcebergTableUtil.getPartitionNames(icebergTable, affectedFiles, latestSpecOnly);
}))
.toList();
List<Future<Set<String>>> futures = createPartitionNameFutures(executor, icebergTable, relevantSnapshots,
latestSpecOnly);

// Collect the results from all completed futures
Set<String> modifiedPartitions = Sets.newHashSet();
Expand All @@ -212,6 +202,77 @@ private List<Partition> findModifiedPartitions(org.apache.hadoop.hive.ql.metadat
}
}

/**
* Checks if there are any modified partitions (with added or deleted files) between a given past
* snapshot ID and the table's current (latest) snapshot. This method short-circuits as soon as
* it finds any modified partition, making it more efficient than findModifiedPartitions when
* only checking for existence.
* @param icebergTable The Iceberg table to inspect.
* @param pastSnapshotTimeMil The timestamp in milliseconds of the snapshot to check from (exclusive).
* @param latestSpecOnly when True, checks partitions with the current spec only;
* False - older specs only;
* Null - any spec
* @return true if at least one modified partition exists, false otherwise.
*/
private boolean hasModifiedPartitions(org.apache.iceberg.Table icebergTable, Long pastSnapshotTimeMil,
Boolean latestSpecOnly) {
List<Snapshot> relevantSnapshots = getRelevantSnapshots(icebergTable, pastSnapshotTimeMil).toList();
if (relevantSnapshots.isEmpty()) {
return false;
}

try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
// Submit a task for each snapshot and collect the Futures
List<Future<Set<String>>> futures = createPartitionNameFutures(
executor, icebergTable, relevantSnapshots, latestSpecOnly);

// Check results as they complete and short-circuit on first non-empty result
for (Future<Set<String>> future : futures) {
Set<String> partitionNames = future.get();
if (!partitionNames.isEmpty()) {
return true;
}
}

return false;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeMetaException(e, "Interrupted while checking for modified partitions");
} catch (ExecutionException e) {
// Just wrap this one in a runtime exception
throw new RuntimeMetaException(e, "Failed to check for modified partitions in parallel");
}
}

/**
* Creates a list of futures that compute partition names from affected files in snapshots.
* Each future processes a single snapshot to extract partition names from added/removed data
* and delete files.
* @param executor The executor service to submit tasks to.
* @param icebergTable The Iceberg table to inspect.
* @param relevantSnapshots The list of snapshots to process.
* @param latestSpecOnly when True, returns partitions with the current spec only;
* False - older specs only;
* Null - any spec
* @return A list of futures, each returning a set of partition names for a snapshot.
*/
private List<Future<Set<String>>> createPartitionNameFutures(ExecutorService executor,
org.apache.iceberg.Table icebergTable, List<Snapshot> relevantSnapshots, Boolean latestSpecOnly) {

return relevantSnapshots.stream()
.map(snapshot -> executor.submit(() -> {
FileIO io = icebergTable.io();
List<ContentFile<?>> affectedFiles = FluentIterable.<ContentFile<?>>concat(
snapshot.addedDataFiles(io),
snapshot.removedDataFiles(io),
snapshot.addedDeleteFiles(io),
snapshot.removedDeleteFiles(io))
.toList();
return IcebergTableUtil.getPartitionNames(icebergTable, affectedFiles, latestSpecOnly);
}))
.toList();
}

/**
* Checks if a table has had new commits since a given snapshot that were not caused by compaction.
* @param icebergTable The Iceberg table to check.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,4 +309,53 @@ public void testConcurrentInsertAndOverwrite() throws Exception {
HiveIcebergTestUtils.validateData(expected,
HiveIcebergTestUtils.valueForRow(schema, objects), 0);
}

@Test
public void testConcurrentInsertAndCompaction() throws Exception {
Assume.assumeTrue(formatVersion >= 2);

Schema schema = new Schema(
required(1, "i", Types.IntegerType.get()),
required(2, "p", Types.IntegerType.get())
);
PartitionSpec spec = PartitionSpec.builderFor(schema).truncate("i", 10).build();

// create and insert an initial batch of records
testTables.createTable(shell, "ice_t", schema, spec, fileFormat,
TestHelper.RecordsBuilder.newInstance(schema)
.add(1, 1)
.add(2, 2)
.add(10, 10)
.add(20, 20)
.add(40, 40)
.add(30, 30)
.build(),
formatVersion, Collections.emptyMap(), STORAGE_HANDLER_STUB);

String[] sql = new String[] {
"INSERT INTO ice_t SELECT i*100, p*100 FROM ice_t",
"ALTER TABLE ice_t compact 'MAJOR' and wait"
};
executeConcurrently(true, RETRY_STRATEGIES_WITHOUT_WRITE_CONFLICT, sql);

List<Object[]> objects =
shell.executeStatement("SELECT * FROM ice_t");
Assert.assertEquals(12, objects.size());
List<Record> expected = TestHelper.RecordsBuilder.newInstance(schema)
.add(2, 2)
.add(3, 3)
.add(11, 11)
.add(21, 21)
.add(31, 31)
.add(41, 41)
.add(101, 101)
.add(201, 201)
.add(1001, 1001)
.add(2001, 2001)
.add(3001, 3001)
.add(4001, 4001)
.build();
HiveIcebergTestUtils.validateData(expected,
HiveIcebergTestUtils.valueForRow(schema, objects), 0);
}
}