Skip to content

Commit

Permalink
Core, Spark: Avoid extra copies of manifests while optimizing V2 tabl…
Browse files Browse the repository at this point in the history
…es (apache#8928)
  • Loading branch information
aokolnychyi authored Oct 31, 2023
1 parent 0e09ac1 commit 50c5f26
Show file tree
Hide file tree
Showing 13 changed files with 111 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@

import static org.apache.iceberg.TableProperties.MANIFEST_TARGET_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT;
import static org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED;
import static org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -57,7 +55,6 @@ public class BaseRewriteManifests extends SnapshotProducer<RewriteManifests>
private final TableOperations ops;
private final Map<Integer, PartitionSpec> specsById;
private final long manifestTargetSizeBytes;
private final boolean snapshotIdInheritanceEnabled;

private final Set<ManifestFile> deletedManifests = Sets.newHashSet();
private final List<ManifestFile> addedManifests = Lists.newArrayList();
Expand All @@ -82,10 +79,6 @@ public class BaseRewriteManifests extends SnapshotProducer<RewriteManifests>
this.manifestTargetSizeBytes =
ops.current()
.propertyAsLong(MANIFEST_TARGET_SIZE_BYTES, MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
this.snapshotIdInheritanceEnabled =
ops.current()
.propertyAsBoolean(
SNAPSHOT_ID_INHERITANCE_ENABLED, SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT);
}

@Override
Expand Down Expand Up @@ -148,7 +141,7 @@ public RewriteManifests addManifest(ManifestFile manifest) {
Preconditions.checkArgument(
manifest.sequenceNumber() == -1, "Sequence must be assigned during commit");

if (snapshotIdInheritanceEnabled && manifest.snapshotId() == null) {
if (canInheritSnapshotId() && manifest.snapshotId() == null) {
addedManifests.add(manifest);
} else {
// the manifest must be rewritten with this update's snapshot ID
Expand Down
12 changes: 12 additions & 0 deletions core/src/main/java/org/apache/iceberg/SnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;
import static org.apache.iceberg.TableProperties.MANIFEST_TARGET_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT;
import static org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED;
import static org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
Expand Down Expand Up @@ -85,6 +87,7 @@ public void accept(String file) {

private final TableOperations ops;
private final boolean strictCleanup;
private final boolean canInheritSnapshotId;
private final String commitUUID = UUID.randomUUID().toString();
private final AtomicInteger manifestCount = new AtomicInteger(0);
private final AtomicInteger attempt = new AtomicInteger(0);
Expand Down Expand Up @@ -116,6 +119,11 @@ protected SnapshotProducer(TableOperations ops) {
this.targetManifestSizeBytes =
ops.current()
.propertyAsLong(MANIFEST_TARGET_SIZE_BYTES, MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
boolean snapshotIdInheritanceEnabled =
ops.current()
.propertyAsBoolean(
SNAPSHOT_ID_INHERITANCE_ENABLED, SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT);
this.canInheritSnapshotId = ops.current().formatVersion() > 1 || snapshotIdInheritanceEnabled;
}

protected abstract ThisT self();
Expand Down Expand Up @@ -536,6 +544,10 @@ protected long snapshotId() {
return snapshotId;
}

protected boolean canInheritSnapshotId() {
return canInheritSnapshotId;
}

private static ManifestFile addMetadata(TableOperations ops, ManifestFile manifest) {
try (ManifestReader<DataFile> reader =
ManifestFiles.read(manifest, ops.io(), ops.current().specsById())) {
Expand Down
12 changes: 12 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestRewriteManifests.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.apache.iceberg.TableProperties.MANIFEST_MERGE_ENABLED;
import static org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -443,6 +444,14 @@ public void testBasicManifestReplacement() throws IOException {
List<ManifestFile> manifests = snapshot.allManifests(table.io());
Assert.assertEquals(3, manifests.size());

if (formatVersion == 1) {
assertThat(manifests.get(0).path()).isNotEqualTo(firstNewManifest.path());
assertThat(manifests.get(1).path()).isNotEqualTo(secondNewManifest.path());
} else {
assertThat(manifests.get(0).path()).isEqualTo(firstNewManifest.path());
assertThat(manifests.get(1).path()).isEqualTo(secondNewManifest.path());
}

validateSummary(snapshot, 1, 1, 2, 0);

validateManifestEntries(
Expand Down Expand Up @@ -499,6 +508,9 @@ public void testBasicManifestReplacementWithSnapshotIdInheritance() throws IOExc
List<ManifestFile> manifests = snapshot.allManifests(table.io());
Assert.assertEquals(3, manifests.size());

assertThat(manifests.get(0).path()).isEqualTo(firstNewManifest.path());
assertThat(manifests.get(1).path()).isEqualTo(secondNewManifest.path());

validateSummary(snapshot, 1, 1, 2, 0);

validateManifestEntries(
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ The value of these properties are not persisted as a part of the table metadata.

| Property | Default | Description |
| --------------------------------------------- | -------- | ------------------------------------------------------------- |
| compatibility.snapshot-id-inheritance.enabled | false | Enables committing snapshots without explicit snapshot IDs |
| compatibility.snapshot-id-inheritance.enabled | false | Enables committing snapshots without explicit snapshot IDs (always true if the format version is > 1) |

## Catalog properties

Expand Down
2 changes: 1 addition & 1 deletion docs/spark-procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ Warning : Files added by this method can be physically deleted by Iceberg operat
| `changed_partition_count` | long | The number of partitioned changed by this command |

{{< hint warning >}}
changed_partition_count will be 0 when table property `compatibility.snapshot-id-inheritance.enabled` is set to true
changed_partition_count will be 0 when table property `compatibility.snapshot-id-inheritance.enabled` is set to true or if the table format version is > 1.
{{< /hint >}}
#### Examples

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ private void replaceManifests(
addedManifests.forEach(rewriteManifests::addManifest);
commit(rewriteManifests);

if (!snapshotIdInheritanceEnabled) {
if (formatVersion == 1 && !snapshotIdInheritanceEnabled) {
// delete new manifests as they were rewritten before the commit
deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.apache.iceberg.ValidationHelpers.snapshotIds;
import static org.apache.iceberg.ValidationHelpers.validateDataManifest;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.assertj.core.api.Assumptions.assumeThat;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -64,6 +65,7 @@
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;

@RunWith(Parameterized.class)
public class TestRewriteManifestsAction extends SparkTestBase {
Expand All @@ -75,25 +77,28 @@ public class TestRewriteManifestsAction extends SparkTestBase {
optional(2, "c2", Types.StringType.get()),
optional(3, "c3", Types.StringType.get()));

@Parameterized.Parameters(name = "snapshotIdInheritanceEnabled = {0}, useCaching = {1}")
@Parameters(name = "snapshotIdInheritanceEnabled = {0}, useCaching = {1}, formatVersion = {2}")
public static Object[] parameters() {
return new Object[][] {
new Object[] {"true", "true"},
new Object[] {"false", "true"},
new Object[] {"true", "false"},
new Object[] {"false", "false"}
new Object[] {"true", "true", 1},
new Object[] {"false", "true", 1},
new Object[] {"true", "false", 2},
new Object[] {"false", "false", 2}
};
}

@Rule public TemporaryFolder temp = new TemporaryFolder();

private final String snapshotIdInheritanceEnabled;
private final String useCaching;
private final int formatVersion;
private String tableLocation = null;

public TestRewriteManifestsAction(String snapshotIdInheritanceEnabled, String useCaching) {
public TestRewriteManifestsAction(
String snapshotIdInheritanceEnabled, String useCaching, int formatVersion) {
this.snapshotIdInheritanceEnabled = snapshotIdInheritanceEnabled;
this.useCaching = useCaching;
this.formatVersion = formatVersion;
}

@Before
Expand All @@ -106,6 +111,7 @@ public void setupTableLocation() throws Exception {
public void testRewriteManifestsEmptyTable() throws IOException {
PartitionSpec spec = PartitionSpec.unpartitioned();
Map<String, String> options = Maps.newHashMap();
options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion));
options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled);
Table table = TABLES.create(SCHEMA, spec, options, tableLocation);

Expand All @@ -127,6 +133,7 @@ public void testRewriteManifestsEmptyTable() throws IOException {
public void testRewriteSmallManifestsNonPartitionedTable() {
PartitionSpec spec = PartitionSpec.unpartitioned();
Map<String, String> options = Maps.newHashMap();
options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion));
options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled);
Table table = TABLES.create(SCHEMA, spec, options, tableLocation);

Expand Down Expand Up @@ -184,6 +191,7 @@ public void testRewriteSmallManifestsNonPartitionedTable() {
public void testRewriteManifestsWithCommitStateUnknownException() {
PartitionSpec spec = PartitionSpec.unpartitioned();
Map<String, String> options = Maps.newHashMap();
options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion));
options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled);
Table table = TABLES.create(SCHEMA, spec, options, tableLocation);

Expand Down Expand Up @@ -250,6 +258,7 @@ public void testRewriteManifestsWithCommitStateUnknownException() {
public void testRewriteSmallManifestsPartitionedTable() {
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c1").truncate("c2", 2).build();
Map<String, String> options = Maps.newHashMap();
options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion));
options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled);
Table table = TABLES.create(SCHEMA, spec, options, tableLocation);

Expand Down Expand Up @@ -334,6 +343,7 @@ public void testRewriteSmallManifestsPartitionedTable() {
public void testRewriteImportedManifests() throws IOException {
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c3").build();
Map<String, String> options = Maps.newHashMap();
options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion));
options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled);
Table table = TABLES.create(SCHEMA, spec, options, tableLocation);

Expand Down Expand Up @@ -390,6 +400,7 @@ public void testRewriteImportedManifests() throws IOException {
public void testRewriteLargeManifestsPartitionedTable() throws IOException {
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c3").build();
Map<String, String> options = Maps.newHashMap();
options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion));
options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled);
Table table = TABLES.create(SCHEMA, spec, options, tableLocation);

Expand Down Expand Up @@ -446,6 +457,7 @@ public void testRewriteLargeManifestsPartitionedTable() throws IOException {
public void testRewriteManifestsWithPredicate() throws IOException {
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c1").truncate("c2", 2).build();
Map<String, String> options = Maps.newHashMap();
options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion));
options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled);
Table table = TABLES.create(SCHEMA, spec, options, tableLocation);

Expand Down Expand Up @@ -513,6 +525,8 @@ public void testRewriteManifestsWithPredicate() throws IOException {

@Test
public void testRewriteSmallManifestsNonPartitionedV2Table() {
assumeThat(formatVersion).isGreaterThan(1);

PartitionSpec spec = PartitionSpec.unpartitioned();
Map<String, String> properties = ImmutableMap.of(TableProperties.FORMAT_VERSION, "2");
Table table = TABLES.create(SCHEMA, spec, properties, tableLocation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ private void replaceManifests(
addedManifests.forEach(rewriteManifests::addManifest);
commit(rewriteManifests);

if (!snapshotIdInheritanceEnabled) {
if (formatVersion == 1 && !snapshotIdInheritanceEnabled) {
// delete new manifests as they were rewritten before the commit
deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.apache.iceberg.ValidationHelpers.snapshotIds;
import static org.apache.iceberg.ValidationHelpers.validateDataManifest;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.assertj.core.api.Assumptions.assumeThat;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -64,6 +65,7 @@
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;

@RunWith(Parameterized.class)
public class TestRewriteManifestsAction extends SparkTestBase {
Expand All @@ -75,25 +77,28 @@ public class TestRewriteManifestsAction extends SparkTestBase {
optional(2, "c2", Types.StringType.get()),
optional(3, "c3", Types.StringType.get()));

@Parameterized.Parameters(name = "snapshotIdInheritanceEnabled = {0}, useCaching = {1}")
@Parameters(name = "snapshotIdInheritanceEnabled = {0}, useCaching = {1}, formatVersion = {2}")
public static Object[] parameters() {
return new Object[][] {
new Object[] {"true", "true"},
new Object[] {"false", "true"},
new Object[] {"true", "false"},
new Object[] {"false", "false"}
new Object[] {"true", "true", 1},
new Object[] {"false", "true", 1},
new Object[] {"true", "false", 2},
new Object[] {"false", "false", 2}
};
}

@Rule public TemporaryFolder temp = new TemporaryFolder();

private final String snapshotIdInheritanceEnabled;
private final String useCaching;
private final int formatVersion;
private String tableLocation = null;

public TestRewriteManifestsAction(String snapshotIdInheritanceEnabled, String useCaching) {
public TestRewriteManifestsAction(
String snapshotIdInheritanceEnabled, String useCaching, int formatVersion) {
this.snapshotIdInheritanceEnabled = snapshotIdInheritanceEnabled;
this.useCaching = useCaching;
this.formatVersion = formatVersion;
}

@Before
Expand All @@ -106,6 +111,7 @@ public void setupTableLocation() throws Exception {
public void testRewriteManifestsEmptyTable() throws IOException {
PartitionSpec spec = PartitionSpec.unpartitioned();
Map<String, String> options = Maps.newHashMap();
options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion));
options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled);
Table table = TABLES.create(SCHEMA, spec, options, tableLocation);

Expand All @@ -127,6 +133,7 @@ public void testRewriteManifestsEmptyTable() throws IOException {
public void testRewriteSmallManifestsNonPartitionedTable() {
PartitionSpec spec = PartitionSpec.unpartitioned();
Map<String, String> options = Maps.newHashMap();
options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion));
options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled);
Table table = TABLES.create(SCHEMA, spec, options, tableLocation);

Expand Down Expand Up @@ -184,6 +191,7 @@ public void testRewriteSmallManifestsNonPartitionedTable() {
public void testRewriteManifestsWithCommitStateUnknownException() {
PartitionSpec spec = PartitionSpec.unpartitioned();
Map<String, String> options = Maps.newHashMap();
options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion));
options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled);
Table table = TABLES.create(SCHEMA, spec, options, tableLocation);

Expand Down Expand Up @@ -250,6 +258,7 @@ public void testRewriteManifestsWithCommitStateUnknownException() {
public void testRewriteSmallManifestsPartitionedTable() {
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c1").truncate("c2", 2).build();
Map<String, String> options = Maps.newHashMap();
options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion));
options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled);
Table table = TABLES.create(SCHEMA, spec, options, tableLocation);

Expand Down Expand Up @@ -334,6 +343,7 @@ public void testRewriteSmallManifestsPartitionedTable() {
public void testRewriteImportedManifests() throws IOException {
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c3").build();
Map<String, String> options = Maps.newHashMap();
options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion));
options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled);
Table table = TABLES.create(SCHEMA, spec, options, tableLocation);

Expand Down Expand Up @@ -390,6 +400,7 @@ public void testRewriteImportedManifests() throws IOException {
public void testRewriteLargeManifestsPartitionedTable() throws IOException {
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c3").build();
Map<String, String> options = Maps.newHashMap();
options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion));
options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled);
Table table = TABLES.create(SCHEMA, spec, options, tableLocation);

Expand Down Expand Up @@ -446,6 +457,7 @@ public void testRewriteLargeManifestsPartitionedTable() throws IOException {
public void testRewriteManifestsWithPredicate() throws IOException {
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c1").truncate("c2", 2).build();
Map<String, String> options = Maps.newHashMap();
options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion));
options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled);
Table table = TABLES.create(SCHEMA, spec, options, tableLocation);

Expand Down Expand Up @@ -513,6 +525,8 @@ public void testRewriteManifestsWithPredicate() throws IOException {

@Test
public void testRewriteSmallManifestsNonPartitionedV2Table() {
assumeThat(formatVersion).isGreaterThan(1);

PartitionSpec spec = PartitionSpec.unpartitioned();
Map<String, String> properties = ImmutableMap.of(TableProperties.FORMAT_VERSION, "2");
Table table = TABLES.create(SCHEMA, spec, properties, tableLocation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ private void replaceManifests(
addedManifests.forEach(rewriteManifests::addManifest);
commit(rewriteManifests);

if (!snapshotIdInheritanceEnabled) {
if (formatVersion == 1 && !snapshotIdInheritanceEnabled) {
// delete new manifests as they were rewritten before the commit
deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
}
Expand Down
Loading

0 comments on commit 50c5f26

Please sign in to comment.