Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core, API, Spec: Metadata Row Lineage #11948

Merged
merged 22 commits into from
Feb 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions api/src/main/java/org/apache/iceberg/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -171,4 +171,29 @@ default Iterable<DeleteFile> removedDeleteFiles(FileIO io) {
default Integer schemaId() {
return null;
}

/**
* The row-id of the first newly added row in this snapshot. All rows added in this snapshot will
* have a row-id assigned to them greater than this value. All rows with a row-id less than this
* value were created in a snapshot that was added to the table (but not necessarily commited to
* this branch) in the past.
*
* @return the first row-id to be used in this snapshot or null if row lineage was not enabled
* when the table was created.
*/
default Long firstRowId() {
return null;
}

/**
* The total number of newly added rows in this snapshot. It should be the summation of {@link
RussellSpitzer marked this conversation as resolved.
Show resolved Hide resolved
* ManifestFile#ADDED_ROWS_COUNT} for every manifest added in this snapshot.
*
* <p>This field is optional but is required when row lineage is enabled.
*
* @return the total number of new rows in this snapshot or null if the value was not stored.
*/
default Long addedRows() {
return null;
}
}
20 changes: 19 additions & 1 deletion core/src/main/java/org/apache/iceberg/BaseSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ class BaseSnapshot implements Snapshot {
private final Map<String, String> summary;
private final Integer schemaId;
private final String[] v1ManifestLocations;
private final Long firstRowId;
private final Long addedRows;

// lazily initialized
private transient List<ManifestFile> allManifests = null;
Expand All @@ -61,7 +63,9 @@ class BaseSnapshot implements Snapshot {
String operation,
Map<String, String> summary,
Integer schemaId,
String manifestList) {
String manifestList,
Long firstRowId,
Long addedRows) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe worth adding an overriding constructor that takes these two new arguments? And the existing constructor would then just pass nulls for these? That way you wouldn't have to update all the constructor calls

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I think there is more of an argument to have multiple constructors since there isn't a builder, but I"m still hesitant to add another constructor here. What instances to we have where we want to make a new Snapshot and not have these fields explicitly specified?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess one alternative could be to introduce a Snapshot Builder instead of relying on the constructor. But if we decide to do that, I'd add that builder to the core module rather than the test module (as is being done by #11947).
It would be good to get some other opinions here in order to decide what the best path forward is

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough I think we can also punt on this for now and have it be a cleanup later since it's not really integral to this pr

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we decide to add a Builder or some other kinda refactoring, I'd also prefer to punt. The PR is fairly sizeable and I'd prefer to keep it focused on the row lineage core metadata changes (and all of this rather internal at the moment)

this.sequenceNumber = sequenceNumber;
this.snapshotId = snapshotId;
this.parentId = parentId;
Expand All @@ -71,6 +75,8 @@ class BaseSnapshot implements Snapshot {
this.schemaId = schemaId;
this.manifestListLocation = manifestList;
this.v1ManifestLocations = null;
this.firstRowId = firstRowId;
this.addedRows = addedRows;
}

BaseSnapshot(
Expand All @@ -91,6 +97,8 @@ class BaseSnapshot implements Snapshot {
this.schemaId = schemaId;
this.manifestListLocation = null;
this.v1ManifestLocations = v1ManifestLocations;
this.firstRowId = null;
this.addedRows = null;
}

@Override
Expand Down Expand Up @@ -128,6 +136,16 @@ public Integer schemaId() {
return schemaId;
}

@Override
public Long firstRowId() {
return firstRowId;
}

@Override
public Long addedRows() {
return addedRows;
}

private void cacheManifests(FileIO fileIO) {
if (fileIO == null) {
throw new IllegalArgumentException("Cannot cache changes: FileIO is null");
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/java/org/apache/iceberg/MetadataUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -517,4 +517,11 @@ public void applyTo(ViewMetadata.Builder viewMetadataBuilder) {
viewMetadataBuilder.setCurrentVersionId(versionId);
}
}

class EnableRowLineage implements MetadataUpdate {
@Override
public void applyTo(TableMetadata.Builder metadataBuilder) {
metadataBuilder.enableRowLineage();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ private MetadataUpdateParser() {}
static final String SET_PARTITION_STATISTICS = "set-partition-statistics";
static final String REMOVE_PARTITION_STATISTICS = "remove-partition-statistics";
static final String REMOVE_PARTITION_SPECS = "remove-partition-specs";
static final String ENABLE_ROW_LINEAGE = "enable-row-lineage";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI that this also requires changes in the OpenAPI spec

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


// AssignUUID
private static final String UUID = "uuid";
Expand Down Expand Up @@ -154,6 +155,7 @@ private MetadataUpdateParser() {}
.put(MetadataUpdate.AddViewVersion.class, ADD_VIEW_VERSION)
.put(MetadataUpdate.SetCurrentViewVersion.class, SET_CURRENT_VIEW_VERSION)
.put(MetadataUpdate.RemovePartitionSpecs.class, REMOVE_PARTITION_SPECS)
.put(MetadataUpdate.EnableRowLineage.class, ENABLE_ROW_LINEAGE)
.buildOrThrow();

public static String toJson(MetadataUpdate metadataUpdate) {
Expand Down Expand Up @@ -249,6 +251,8 @@ public static void toJson(MetadataUpdate metadataUpdate, JsonGenerator generator
case REMOVE_PARTITION_SPECS:
writeRemovePartitionSpecs((MetadataUpdate.RemovePartitionSpecs) metadataUpdate, generator);
break;
case ENABLE_ROW_LINEAGE:
break;
default:
throw new IllegalArgumentException(
String.format(
Expand Down Expand Up @@ -322,6 +326,8 @@ public static MetadataUpdate fromJson(JsonNode jsonNode) {
return readCurrentViewVersionId(jsonNode);
case REMOVE_PARTITION_SPECS:
return readRemovePartitionSpecs(jsonNode);
case ENABLE_ROW_LINEAGE:
return new MetadataUpdate.EnableRowLineage();
default:
throw new UnsupportedOperationException(
String.format("Cannot convert metadata update action to json: %s", action));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,9 @@ public static TableMetadata replacePaths(
// TODO: update statistic file paths
metadata.statisticsFiles(),
metadata.partitionStatisticsFiles(),
metadata.changes());
metadata.changes(),
metadata.rowLineageEnabled(),
metadata.nextRowId());
}

private static Map<String, String> updateProperties(
Expand Down Expand Up @@ -186,7 +188,9 @@ private static List<Snapshot> updatePathInSnapshots(
snapshot.operation(),
snapshot.summary(),
snapshot.schemaId(),
newManifestListLocation);
newManifestListLocation,
snapshot.firstRowId(),
snapshot.addedRows());
newSnapshots.add(newSnapshot);
}
return newSnapshots;
Expand Down
17 changes: 16 additions & 1 deletion core/src/main/java/org/apache/iceberg/SnapshotParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ private SnapshotParser() {}
private static final String MANIFESTS = "manifests";
private static final String MANIFEST_LIST = "manifest-list";
private static final String SCHEMA_ID = "schema-id";
private static final String FIRST_ROW_ID = "first-row-id";
private static final String ADDED_ROWS = "added-rows";

static void toJson(Snapshot snapshot, JsonGenerator generator) throws IOException {
generator.writeStartObject();
Expand Down Expand Up @@ -96,6 +98,14 @@ static void toJson(Snapshot snapshot, JsonGenerator generator) throws IOExceptio
generator.writeNumberField(SCHEMA_ID, snapshot.schemaId());
}

if (snapshot.firstRowId() != null) {
generator.writeNumberField(FIRST_ROW_ID, snapshot.firstRowId());
}

if (snapshot.addedRows() != null) {
generator.writeNumberField(ADDED_ROWS, snapshot.addedRows());
}

generator.writeEndObject();
}

Expand Down Expand Up @@ -158,6 +168,9 @@ static Snapshot fromJson(JsonNode node) {

Integer schemaId = JsonUtil.getIntOrNull(SCHEMA_ID, node);

Long firstRowId = JsonUtil.getLongOrNull(FIRST_ROW_ID, node);
Long addedRows = JsonUtil.getLongOrNull(ADDED_ROWS, node);

if (node.has(MANIFEST_LIST)) {
// the manifest list is stored in a manifest list file
String manifestList = JsonUtil.getString(MANIFEST_LIST, node);
Expand All @@ -169,7 +182,9 @@ static Snapshot fromJson(JsonNode node) {
operation,
summary,
schemaId,
manifestList);
manifestList,
firstRowId,
addedRows);

} else {
// fall back to an embedded manifest list. pass in the manifest's InputFile so length can be
Expand Down
30 changes: 29 additions & 1 deletion core/src/main/java/org/apache/iceberg/SnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
Expand Down Expand Up @@ -282,6 +283,13 @@ public Snapshot apply() {
throw new RuntimeIOException(e, "Failed to write manifest list file");
}

Long addedRows = null;
Long lastRowId = null;
if (base.rowLineageEnabled()) {
addedRows = calculateAddedRows(manifests);
Copy link
Contributor

@amogh-jahagirdar amogh-jahagirdar Jan 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an optimization so we can always just do this later (and arguably makes it a bit harder to read the code) but instead of waiting until all the manifests are written what if we set the addedRows as we add manifests to the writer in the try with-resources-above

Something like

// remove the writer.addAll
for (ManifestFile manifest: manifestFIles) {
 if (rowLineageEnabled()) {
     if (manifest.snapshotId() == null || (manifest.snapshotId() == this.snapshotId) {
                     Preconditions.checkArgument(
                  manifest.addedRowsCount() != null,
                  "Cannot determine number of added rows in snapshot because"
                      + " the entry for manifest %s is missing the field `added-rows-count`",
          addedRowsCount += manifest.addedRowsCount();
     }
}
      writer.add(manifest);
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's worth thinking about, but here I'm not sure it's that much of an optimization. The amount of new manifests should be pretty small, even if it was thousands of manifests the overhead should be very low imho and we already have them in memory.

lastRowId = base.nextRowId();
}

return new BaseSnapshot(
sequenceNumber,
snapshotId(),
Expand All @@ -290,7 +298,27 @@ public Snapshot apply() {
operation(),
summary(base),
base.currentSchemaId(),
manifestList.location());
manifestList.location(),
lastRowId,
addedRows);
}

private Long calculateAddedRows(List<ManifestFile> manifests) {
return manifests.stream()
.filter(
manifest ->
manifest.snapshotId() == null
|| Objects.equals(manifest.snapshotId(), this.snapshotId))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we use snapshotId() method (instead of this.snapshotId)?

there is no practical difference, since earlier call of snapshotId() already set it. but it is probably better for safety to avoid such implicit code ordering dependency

.mapToLong(
manifest -> {
Preconditions.checkArgument(
manifest.addedRowsCount() != null,
"Cannot determine number of added rows in snapshot because"
+ " the entry for manifest %s is missing the field `added-rows-count`",
manifest.path());
amogh-jahagirdar marked this conversation as resolved.
Show resolved Hide resolved
return manifest.addedRowsCount();
})
.sum();
}

protected abstract Map<String, String> summary();
Expand Down
Loading