-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
API: Add a scan for changes #4870
Conversation
5027453
to
ec0c332
Compare
I spent some time thinking and decided that separate scan task types would work a little bit better. Then we can adapt our readers to handle different task types independently whether we will use a utility or a metadata table scan approach (or both). |
api/src/main/java/org/apache/iceberg/AddedDataFileScanTask.java
Outdated
Show resolved
Hide resolved
api/src/main/java/org/apache/iceberg/IncrementalChangelogScan.java
Outdated
Show resolved
Hide resolved
api/src/main/java/org/apache/iceberg/DeleteMarkersScanTask.java
Outdated
Show resolved
Hide resolved
/** | ||
* Returns the snapshot ID in which the changes were committed. | ||
*/ | ||
long commitSnapshotId(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we restrict this scan task for one snapshot? I feel like itself and its subclass can be used for a scan across multiple snapshots, which is useful, for example, to scan position deletes across snapshots. In that case, commitSnapshotId is going to be optional.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we scan for changes across snapshots, I think we should output all changes per snapshot by default. If I understand you correctly, you want to support use cases when the user is only interested in net changes across snapshots. I think that's possible too but the records produced by a task would still have some snapshot ID when they were committed.
For instance, we add data file A in S1. Then we add a delete file D in S2 that matches data file A. In this case, we would output AddedDataFileScanTask
with commit snapshot S1 but it will have a delete file D associated with it so that we will only output records that remain there in S2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What will commitSnapshotId()
return in that case? s1 or s2, or a list of snapshot ids? I'd think the snapshot ids may not be useful if we want to calculate the net changes across snapshots. For example, IncrementalAppendScan
could be considered as one of cases that outputs scan tasks across snapshots, which doesn't need the snapshot id.
One solution is to add another layer of interface to support the net changes across snapshots. For example, we can use ChangelogScanTask
as a parent for both cases of one snapshot and multiple snapshots, and then create a new interface SnapshotChangelogScanTask
to extend it for use case of a single snapshot.
I think the current solution is good enough for CDC implementation. But, since it is an interface change, we may want to make sure it is extendable for the future use cases. If it is something we will do to support net changes across snapshots, we may choose the right the right hierarchy and names now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After thinking more about this, I agree that commit snapshot ID doesn't make sense for net changes across multiple snapshots in some cases. For instance, when we have 2 snapshots that add delete files for the same data file. In this case, we will have a single task with one data file and two deletes added in multiple snapshots.
Having said that, I am still not sure having separate tasks is a good idea. Apart from more tasks to support in readers, what schema to produce? Our changelog
metadata table should have the same schema independently whether we include only net changes or all changes per each snapshot.
We can make the commit snapshot ID null when only net changes are produced, skip the commit snapshot ID in both cases or offer some sort of a map of metadata where the snapshot ID may or may not be set.
Let me think more about this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's OK to leave commitSnapshotId
and commitOrder
null in case of net changes. I assume the intent to have method commitSnapshotId
and commitOrder
is to enable reader project them.
ID | Name | Age | _record_type | _commit_snapshot_id | _commit_timestamp | _commit_order |
---|---|---|---|---|---|---|
1 | Amy | 21 | Insert | 8563478696506531065 | 2021-01-23 04:30:45 | 34 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@flyrain @rdblue, I agree with you but what about DeletedRowsScanTask
?
Suppose I have a data file file_A
:
id
--
1
2
3
4
5
And then I have two delete snapshots S1 and S2. S1 deletes IDs 1, 2 and S2 deletes IDs 3, 4. If we are to assign snapshot IDs to tasks, we will have to produce two DeletedRowsScanTask
s:
file_A & deletes from S1 (inherit the snapshot ID and commit order from S1)
file_A & deletes from S2 (inherit the snapshot ID and commit order from S2)
Even if we combine these two tasks in one group, it will be suboptimal as we will scan through the same data file twice. It would be more efficient to combine these deletes against the same data file and do one pass to determine deleted records.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aokolnychyi, I'd say that's a special form of combining. We could introduce a CombinedDeletedRowsScanTask
that takes care of the snapshot handling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We won't be able to extend ChangelogScanTask
in CombinedDeletedRowsScanTask
, though? Unless we remove the commit snapshot ID from ChangelogScanTask
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see your point, but I'm thinking that we would just throw an exception if it were called. I'd also be up for returning null and passing each sub-task's ID and ordinal separately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An exception sounds reasonable. Using nulls in a public API is also not the best idea.
I've simplified this PR to always resolve equality deletes for now. I feel like it is easier to get this working first and add the option of not resolving equality deletes later. Right now, all added scan tasks fit well into |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
/** | ||
* A scan task for deleted data records generated by adding delete files to the table. | ||
*/ | ||
public interface DeletedRowsScanTask extends ChangelogScanTask { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've discovered one issue implementing this API locally. Specifically, we need to differentiate between historical deletes and new deletes in this task. If we add a new delete file, we should NOT output records that were deleted in earlier snapshots. We need to apply old deletes first and then see what was matched with the new deletes. It's not something we can express with FileScanTask
. I could add another method for new deletes and return deletes from earlier snapshots in the existing deletes()
method. That sounds awkward.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aokolnychyi is it necessary to suppress the new deletes? We got new deletes because some writer deleted the same row again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you talking about position deletes within the same snapshot?
I thought for that we were going to check the sequence number and adjust behavior. So there's no need for a flag.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue, no. I am talking about the same row being deleted multiple times. That can happen both with position or equality deletes.
Case 1: we have two concurrent merge-on-read DELETEs in Spark. If the operations remove the same row, we allow both commits to succeed. The correct behavior would be to report the matching row as deleted only in the first snapshot as the second snapshot added a delete for an already deleted row.
Case 2: we have two snapshots that both add equality deletes for data file A. When we output changes for the second snapshot, the correct behavior would be to output only the deleted records that were still live (i.e. apply the equality delete added in the first snapshot before checking what was removed in the second snapshot).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stevenzwu, not sure yet. I guess that would be the correct behavior but may complicate things quite a bit as we need to load historic deletes too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem happens when we only generate CDC records from s2
I feel it is an invalid case. CDC read should been incremental and continuous. If an Iceberg table is the CDC source, I would imagine CDC read should start with full table scan, then transition to the incremental changelog scan. We shouldn't read S2 without reading S1. Even if CDC read started from S2 and continues forward, it is also ok to emit a delete in my mind.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It has nothing to do with primary key(s). Iceberg allows two or multiple rows to be identical. For example, here are two rows with the same values in each columns(id, name, age).
r1 (1, "Alice", 20) at pos 0 of data file df1 r2 (1, "Alice", 20) at pos 1 of data file df1
For V2 format should not exist two same data but no delete data. It's invalid.
The user won't be able to distinguish the following two scenarios.
- Both s1 and s2 deleted r1.
- S1 deleted r1, while s2 deleted r2. Or vise-verse.
Base on the above this case is invalid. Even this happen, for equal-delete there should be Both s1 and s2 deleted r1.
, for postion-delete there is specific data what locate at what file and postion should be delete.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to give specific examples for cases I mentioned earlier.
Concurrent merge-on-read DELETEs in Spark
We have data_file_A
:
1, "a" (pos 0)
2, "b" (pos 1)
3, "c" (pos 2)
4, "d" (pos 3)
Suppose we have two concurrent DELETEs (d1
and d2
). The first DELETE removes records in pos 0
and pos 2
. The second one concurrently removes pos 0
and pos 1
. We allow the second DELETE to commit cause it is not in conflict.
How should our changelog look like?
Changelog for d1
:
deleted, 1, "a"
deleted, 3, "c"
Changelog for d2
:
deleted, 2, "b"
I think 1, "a"
should only appear in d1
despite that a delete file added in d2
refers to it.
Equality deletes against the same data file
We have data_file_A
:
1, "hr" (pos 0)
2, "sw" (pos 1)
3, "hr" (pos 2)
4, "sw" (pos 3)
Suppose we have a GDPR delete d1
that adds an equality delete for 1
and a concurrent equality delete d2
that removes all records in hr
department.
How should our changelog look like?
Changelog for d1
:
deleted, 1, "hr"
Changelog for d2
:
deleted, 3, "hr"
I don't think outputting 1, "hr"
again in d2
would be correct as that record wasn't live when d2
committed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can make this configurable if we want as I don't see this being an issue if Flink CDC is the only writer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think either interpretation of the concurrent delete case is fine. For the position delete example, d1 and d2 are concurrent and based on the same underlying data, so I'm fine saying BOTH of them deleted a row. It is a little strange to say d1 deleted pos 0 and d2 didn't because d2 actually did encode that delete and would have deleted row 0 if it had won the race to commit. But there is also a strong argument that we should produce a delete for each row once to avoid confusion when people consume these tables.
I think as long as the implementation documents what it does, either one is fine.
We discussed refactoring |
* | ||
* @return an incremental changelog scan | ||
*/ | ||
default IncrementalChangelogScan newIncrementalChangelogScan() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want changelog
or simply change
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stevenzwu, how do you feel about the names? You know more about CDC use cases. I think I'd slightly prefer shorter names like ChangeScan
and ChangeScanTask
but if changelog
is really widely used, I have no problem using it as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems changelog
is pretty popular. Let's keep it as-is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, I think this looks great.
api/src/main/java/org/apache/iceberg/DeletedDataFileScanTask.java
Outdated
Show resolved
Hide resolved
ThisT fromSnapshotExclusive(long fromSnapshotId); | ||
|
||
/** | ||
* Instructs this scan to look for changes up to a particular snapshot (inclusive). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is about setting the "end" snapshot ID correct? We should probably note that here since that vocab is used in the previous snapshots.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I aligned methods above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This all works for me, I'm only requesting a few mods to the javadocs. The approach and API should work well.
Looks great. Thanks for all the work and discussion, everyone! |
This PR is an attempt to support a scan for CDC based on @flyrain's proposal and PR.