Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API: Add a scan for changes #4870

Merged
merged 2 commits into from
Jul 3, 2022
Merged

API: Add a scan for changes #4870

merged 2 commits into from
Jul 3, 2022

Conversation

aokolnychyi
Copy link
Contributor

This PR is an attempt to support a scan for CDC based on @flyrain's proposal and PR.

@aokolnychyi aokolnychyi force-pushed the change-scan branch 3 times, most recently from 5027453 to ec0c332 Compare June 1, 2022 20:27
@aokolnychyi
Copy link
Contributor Author

aokolnychyi commented Jun 1, 2022

I spent some time thinking and decided that separate scan task types would work a little bit better. Then we can adapt our readers to handle different task types independently whether we will use a utility or a metadata table scan approach (or both).

/**
* Returns the snapshot ID in which the changes were committed.
*/
long commitSnapshotId();
Copy link
Contributor

@flyrain flyrain Jun 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we restrict this scan task for one snapshot? I feel like itself and its subclass can be used for a scan across multiple snapshots, which is useful, for example, to scan position deletes across snapshots. In that case, commitSnapshotId is going to be optional.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we scan for changes across snapshots, I think we should output all changes per snapshot by default. If I understand you correctly, you want to support use cases when the user is only interested in net changes across snapshots. I think that's possible too but the records produced by a task would still have some snapshot ID when they were committed.

For instance, we add data file A in S1. Then we add a delete file D in S2 that matches data file A. In this case, we would output AddedDataFileScanTask with commit snapshot S1 but it will have a delete file D associated with it so that we will only output records that remain there in S2.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will commitSnapshotId() return in that case? s1 or s2, or a list of snapshot ids? I'd think the snapshot ids may not be useful if we want to calculate the net changes across snapshots. For example, IncrementalAppendScan could be considered as one of cases that outputs scan tasks across snapshots, which doesn't need the snapshot id.

One solution is to add another layer of interface to support the net changes across snapshots. For example, we can use ChangelogScanTask as a parent for both cases of one snapshot and multiple snapshots, and then create a new interface SnapshotChangelogScanTask to extend it for use case of a single snapshot.

I think the current solution is good enough for CDC implementation. But, since it is an interface change, we may want to make sure it is extendable for the future use cases. If it is something we will do to support net changes across snapshots, we may choose the right the right hierarchy and names now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After thinking more about this, I agree that commit snapshot ID doesn't make sense for net changes across multiple snapshots in some cases. For instance, when we have 2 snapshots that add delete files for the same data file. In this case, we will have a single task with one data file and two deletes added in multiple snapshots.

Having said that, I am still not sure having separate tasks is a good idea. Apart from more tasks to support in readers, what schema to produce? Our changelog metadata table should have the same schema independently whether we include only net changes or all changes per each snapshot.

We can make the commit snapshot ID null when only net changes are produced, skip the commit snapshot ID in both cases or offer some sort of a map of metadata where the snapshot ID may or may not be set.

Let me think more about this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's OK to leave commitSnapshotId and commitOrder null in case of net changes. I assume the intent to have method commitSnapshotId and commitOrder is to enable reader project them.

ID Name Age _record_type _commit_snapshot_id _commit_timestamp _commit_order
1 Amy 21 Insert 8563478696506531065 2021-01-23 04:30:45 34
Considering the cdc record format above, does reader need `commitTimestamp` as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@flyrain @rdblue, I agree with you but what about DeletedRowsScanTask?

Suppose I have a data file file_A:

id
--
1
2
3
4
5

And then I have two delete snapshots S1 and S2. S1 deletes IDs 1, 2 and S2 deletes IDs 3, 4. If we are to assign snapshot IDs to tasks, we will have to produce two DeletedRowsScanTasks:

file_A & deletes from S1 (inherit the snapshot ID and commit order from S1)
file_A & deletes from S2 (inherit the snapshot ID and commit order from S2)

Even if we combine these two tasks in one group, it will be suboptimal as we will scan through the same data file twice. It would be more efficient to combine these deletes against the same data file and do one pass to determine deleted records.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aokolnychyi, I'd say that's a special form of combining. We could introduce a CombinedDeletedRowsScanTask that takes care of the snapshot handling.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We won't be able to extend ChangelogScanTask in CombinedDeletedRowsScanTask, though? Unless we remove the commit snapshot ID from ChangelogScanTask?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point, but I'm thinking that we would just throw an exception if it were called. I'd also be up for returning null and passing each sub-task's ID and ordinal separately.

Copy link
Contributor Author

@aokolnychyi aokolnychyi Jul 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An exception sounds reasonable. Using nulls in a public API is also not the best idea.

@aokolnychyi
Copy link
Contributor Author

aokolnychyi commented Jun 6, 2022

I've simplified this PR to always resolve equality deletes for now. I feel like it is easier to get this working first and add the option of not resolving equality deletes later. Right now, all added scan tasks fit well into FileScanTask.

Copy link
Contributor

@stevenzwu stevenzwu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

/**
* A scan task for deleted data records generated by adding delete files to the table.
*/
public interface DeletedRowsScanTask extends ChangelogScanTask {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've discovered one issue implementing this API locally. Specifically, we need to differentiate between historical deletes and new deletes in this task. If we add a new delete file, we should NOT output records that were deleted in earlier snapshots. We need to apply old deletes first and then see what was matched with the new deletes. It's not something we can express with FileScanTask. I could add another method for new deletes and return deletes from earlier snapshots in the existing deletes() method. That sounds awkward.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aokolnychyi is it necessary to suppress the new deletes? We got new deletes because some writer deleted the same row again.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you talking about position deletes within the same snapshot?

I thought for that we were going to check the sequence number and adjust behavior. So there's no need for a flag.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue, no. I am talking about the same row being deleted multiple times. That can happen both with position or equality deletes.

Case 1: we have two concurrent merge-on-read DELETEs in Spark. If the operations remove the same row, we allow both commits to succeed. The correct behavior would be to report the matching row as deleted only in the first snapshot as the second snapshot added a delete for an already deleted row.

Case 2: we have two snapshots that both add equality deletes for data file A. When we output changes for the second snapshot, the correct behavior would be to output only the deleted records that were still live (i.e. apply the equality delete added in the first snapshot before checking what was removed in the second snapshot).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stevenzwu, not sure yet. I guess that would be the correct behavior but may complicate things quite a bit as we need to load historic deletes too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem happens when we only generate CDC records from s2

I feel it is an invalid case. CDC read should been incremental and continuous. If an Iceberg table is the CDC source, I would imagine CDC read should start with full table scan, then transition to the incremental changelog scan. We shouldn't read S2 without reading S1. Even if CDC read started from S2 and continues forward, it is also ok to emit a delete in my mind.

+1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has nothing to do with primary key(s). Iceberg allows two or multiple rows to be identical. For example, here are two rows with the same values in each columns(id, name, age).

r1 (1, "Alice", 20) at pos 0 of data file df1
r2 (1, "Alice", 20) at pos 1 of data file df1

For V2 format should not exist two same data but no delete data. It's invalid.

The user won't be able to distinguish the following two scenarios.

  1. Both s1 and s2 deleted r1.
  2. S1 deleted r1, while s2 deleted r2. Or vise-verse.

Base on the above this case is invalid. Even this happen, for equal-delete there should be Both s1 and s2 deleted r1., for postion-delete there is specific data what locate at what file and postion should be delete.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to give specific examples for cases I mentioned earlier.

Concurrent merge-on-read DELETEs in Spark

We have data_file_A:

1, "a" (pos 0)
2, "b" (pos 1)
3, "c" (pos 2)
4, "d" (pos 3)

Suppose we have two concurrent DELETEs (d1 and d2). The first DELETE removes records in pos 0 and pos 2. The second one concurrently removes pos 0 and pos 1. We allow the second DELETE to commit cause it is not in conflict.

How should our changelog look like?

Changelog for d1:

deleted, 1, "a"
deleted, 3, "c"

Changelog for d2:

deleted, 2, "b"

I think 1, "a" should only appear in d1 despite that a delete file added in d2 refers to it.

Equality deletes against the same data file

We have data_file_A:

1, "hr" (pos 0)
2, "sw" (pos 1)
3, "hr" (pos 2)
4, "sw" (pos 3)

Suppose we have a GDPR delete d1 that adds an equality delete for 1 and a concurrent equality delete d2 that removes all records in hr department.

How should our changelog look like?

Changelog for d1:

deleted, 1, "hr"

Changelog for d2:

deleted, 3, "hr"

I don't think outputting 1, "hr" again in d2 would be correct as that record wasn't live when d2 committed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can make this configurable if we want as I don't see this being an issue if Flink CDC is the only writer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think either interpretation of the concurrent delete case is fine. For the position delete example, d1 and d2 are concurrent and based on the same underlying data, so I'm fine saying BOTH of them deleted a row. It is a little strange to say d1 deleted pos 0 and d2 didn't because d2 actually did encode that delete and would have deleted row 0 if it had won the race to commit. But there is also a strong argument that we should produce a delete for each row once to avoid confusion when people consume these tables.

I think as long as the implementation documents what it does, either one is fine.

@aokolnychyi
Copy link
Contributor Author

We discussed refactoring FileScanTask using a parent interface during the sync. I gave it a try in #5077.
I will need a little bit more time to iterate on it but it is open for early feedback.

*
* @return an incremental changelog scan
*/
default IncrementalChangelogScan newIncrementalChangelogScan() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want changelog or simply change?

Copy link
Contributor Author

@aokolnychyi aokolnychyi Jun 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stevenzwu, how do you feel about the names? You know more about CDC use cases. I think I'd slightly prefer shorter names like ChangeScan and ChangeScanTask but if changelog is really widely used, I have no problem using it as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems changelog is pretty popular. Let's keep it as-is.

Copy link
Contributor

@rdblue rdblue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, I think this looks great.

@rdblue rdblue added this to the Iceberg 0.14.0 Release milestone Jun 28, 2022
ThisT fromSnapshotExclusive(long fromSnapshotId);

/**
* Instructs this scan to look for changes up to a particular snapshot (inclusive).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is about setting the "end" snapshot ID correct? We should probably note that here since that vocab is used in the previous snapshots.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I aligned methods above.

Copy link
Member

@RussellSpitzer RussellSpitzer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This all works for me, I'm only requesting a few mods to the javadocs. The approach and API should work well.

@rdblue rdblue merged commit c69a3dd into apache:master Jul 3, 2022
@rdblue
Copy link
Contributor

rdblue commented Jul 3, 2022

Looks great. Thanks for all the work and discussion, everyone!

namrathamyske pushed a commit to namrathamyske/iceberg that referenced this pull request Jul 10, 2022
namrathamyske pushed a commit to namrathamyske/iceberg that referenced this pull request Jul 10, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants