Skip to content

HIVE-29437: Iceberg: Fix concurrency issues between compaction and co…#6292

Open
difin wants to merge 2 commits intoapache:masterfrom
difin:iceberg_compaction_concurrency_fix
Open

HIVE-29437: Iceberg: Fix concurrency issues between compaction and co…#6292
difin wants to merge 2 commits intoapache:masterfrom
difin:iceberg_compaction_concurrency_fix

Conversation

@difin
Copy link
Contributor

@difin difin commented Feb 3, 2026

…ncurrent write operations.

What changes were proposed in this pull request?

Fixing concurrency issues between compaction and concurrent write operations.

Why are the changes needed?

It was found in downstream testing that when Hive Iceberg compaction is running in parallel to Spark write operations on the same table, compaction sometimes produces wrong results. Before committing, when Hive already has the compacted data files that need to replace existing, uncompacted data and delete files in a table or partition, it collects those uncompacted data and delete files to replace them with the compacted files. The issue is that Hive collects those uncompacted data and delete files from the latest Iceberg snapshot instead of using the original snapshot. The latest snapshot may contain different data because of concurrent write operations, which can lead to data corruption.

Does this PR introduce any user-facing change?

No

How was this patch tested?

The fix was validated downstream with concurrent Spark write operations and Hive Iceberg compaction.

IcebergCompactionUtil.getDataFiles(table, snapshotId, partitionPath, fileSizeThreshold);
List<DeleteFile> existingDeleteFiles = fileSizeThreshold == -1 ?
IcebergCompactionUtil.getDeleteFiles(table, partitionPath) : Collections.emptyList();
IcebergCompactionUtil.getDeleteFiles(table, snapshotId, partitionPath) : Collections.emptyList();
Copy link
Member

@deniskuzZ deniskuzZ Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add test.
as an example you could use TestConflictingDataFiles#testConflictingUpdateAndDelete

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Working on it.

Table deletesTable =
MetadataTableUtils.createMetadataTableInstance(table, MetadataTableType.POSITION_DELETES);
CloseableIterable<ScanTask> deletesScanTasks = deletesTable.newBatchScan().planFiles();
CloseableIterable<ScanTask> deletesScanTasks = deletesTable.newBatchScan().useSnapshot(snapshotId).planFiles();
Copy link
Member

@deniskuzZ deniskuzZ Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you use here newBatchScan() and in getDataFiles newScan()? should we use BatchScan in both places?

Copy link
Contributor Author

@difin difin Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was in the existing code, changed to newScan().
Changed to use BatchScan in both places.

@difin difin force-pushed the iceberg_compaction_concurrency_fix branch from b842441 to 5433c78 Compare February 4, 2026 21:04
@sonarqubecloud
Copy link

sonarqubecloud bot commented Feb 4, 2026

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants