Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API: Introduce a new IncrementalAppendScan interface #4580

Merged
merged 11 commits into from
May 13, 2022

Conversation

stevenzwu
Copy link
Contributor

@stevenzwu stevenzwu commented Apr 18, 2022

During review of Flink FLIP-27 source PR #4329 , we agreed that the streaming start strategy should be inclusive. Hence we would need the TableScan#appendsBetween to support nullable fromSnapshotId. Right now, fromSnapshotId is a primitive type of long.
#4329 (comment)

Initially, @rdblue and I were thinking just overload appendsBetween with a Long fromSnapshotId. That would cause compiling error due to ambiguity of type resolution. As we want to maintain binary backward compatibility, I tried to add a new method name appendsInRange in PR #4529 , which is not as an intuitive name as appendsBetween.

This is a different direction with PR #4529 that is suggested by @rdblue. Instead of modifying TableScan interface, maybe we can consider introduce a new IncrementalTableScan interface and Table#newIncrementalScan method.

  • We can decouple regular TableScan and IncrementalTableScan. This also help avoids the need of UnsupportedOperationException for some TableScan methods
  • We may have more complex incremental scan in the future like the CDC data that can be added to the new IncrementalTableScan API

To avoid code duplication, a new super interface of Scan<T extends Scan> was extracted as the parent of the current TableScan and the new IncrementalTableScan. I ran japi-compliance-checker too check binary compatibility after the interface refactoring. Will attach the result

@github-actions github-actions bot added the API label Apr 18, 2022
@stevenzwu stevenzwu changed the title Core: Introduce a new IncrementalTableScan interface API: Introduce a new IncrementalTableScan interface Apr 18, 2022
@stevenzwu
Copy link
Contributor Author

Here is the japi-compliance-checker report for iceberg-api jar before and after this change

api-compatibility-report.pdf

@stevenzwu
Copy link
Contributor Author

@rdblue @openinx @yittg @aokolnychyi @flyrain @flashJd can you please take a look at this approach and see if it makes sense?

@@ -107,24 +68,6 @@ default TableScan select(String... columns) {
return select(Lists.newArrayList(columns));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

couldn't move this select method to the Scan interface, as API compatibility check tool shows that the return type of TableScan is changed to Scan probably due to varargs.

@stevenzwu
Copy link
Contributor Author

cc @hameizi the author of PR #3095

*
* Default behavior for incremental scan fails if there are overwrite operations in the incremental snapshot range
*/
IncrementalTableScan ignoreOverwrites();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For V2 tables, incremental scan is potentially interested in append, overwrite, delete operations. append is always included. I guess we need to discuss what is the natural API to control overwrite and delete.

this API of ignoreOverwrites might be too restrictive. then we also might need to expose an includeOverwrites. I am wondering we should api like

include(DataOperations... operations) // default is append only. for CDC read, overwrite and delete can be added
fail(DataOperations... operations) // default is fail nothing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or maybe we should just need the appendsOnly API above. otherwise, overwrite and delete snapshots are also included automatically for V2 tables.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or maybe we should just need the appendsOnly API above. otherwise, overwrite and delete snapshots are also included automatically for V2 tables.

I agree. Because i think for V2 table include overwrite and delete snapshots should be a default behavior.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the logical completeness of filtering snapshot type, should we have the following 3 methods?

  1. ignoreAppend
  2. ignoreOverwrite
  3. ignoreDelete

And the appendsOnly is basically a combination of ignoreOverwrite and ignoreDelete

Copy link
Contributor Author

@stevenzwu stevenzwu Apr 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree the 3 ignore methods are complete and flexible? wondering if we need the flexibility though. E.g., does append + overwrite or overwrite + delete ever make sense? Do we only need two modes: (1) appendsOnly (2) append + overwrite + delete).

Do we need fail APIs (like failOverwrite)? My current take is no. Let's just skip the snapshots (not interested).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is flexible. append + overwrite makes sense for user who only want to get inserted rows with some additional filtering. overwrite + delete makes sense for getting only deleted rows.
I'm not aware of a use case with failOverwrite. We may skip it now.

Copy link
Collaborator

@chenjunjiedada chenjunjiedada Apr 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about dataBetween(Long startSnapshotId, long endSnapshot, List<RowKind> rowKinds)? then we could produce exact data according to the given row kinds.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chenjunjiedada RowKind is a Flink API.

Based on the conversation, it seems most people prefer the fluent style API for the scan builder, like fromSnapshotId(long fromSnapshotId)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant we could borrow the RowKind definition of the produced data. Like what @flyrain mentioned before. We could use +I to target the append + overwrite with some filter, use -D, -U to target delete and some data in overwrite.

* @param fromSnapshotId the start snapshot id (exclusive)
* @return an incremental table scan from {@code fromSnapshotId} exclusive
*/
IncrementalTableScan fromSnapshotId(long fromSnapshotId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it be more generic to add a inclusive flag in this fromSnapshotId method ? (So that we can meet the requirement for both including & excluding fromSnapshotId incremental scan )

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree. Base on this, is there should add method like useSnapshot to process just one snapshot. I think it's useful for read iceberg table in streaming mode.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, the fromSnapshotId is exclusive behavior (from, to] for the incremental scan, as the toSnapshotId will become the fromSnapshotId` in the next scan.

For the inclusive behavior, we were mainly talking about the starting strategy. E.g., if we said a specific start snapshot id, we want to include the files in this snapshot (if append). To support that, we need the incremental scan to support nullable fromSnapshotId, as we will just pass in the parent snapshot id (which can be null). That wasn't possible with the TableScan#appendsBetween(long fromSnapshotId, long toSnapshotId).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to add exclusive behavior, then we should add alternative methods like afterSnapshotId. To me, fromSnapshotId should be inclusive of the snapshot that is identified. We can come up with better names for these, like fromSnapshotInclusive and fromSnapshotExclusive if you like those better.

@@ -148,7 +89,9 @@ default TableScan select(String... columns) {
* @return a table scan which can read append data from {@code fromSnapshotId}
* exclusive and up to {@code toSnapshotId} inclusive
*/
TableScan appendsBetween(long fromSnapshotId, long toSnapshotId);
default TableScan appendsBetween(long fromSnapshotId, long toSnapshotId) {
throw new UnsupportedOperationException("Incremental scan is not supported");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will need to keep this implementation for at least one minor release ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the apache project compatibility rules..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this change is not strictly required for this change. I am ok to revert it. I added it here as I was thinking about removing the duplicate code of UnsupportedOperationException in many TableScan impl classes.

We can mark those two appends methods as deprecated once the new IncrementalScan impls are ready. Yes, we can follow the compatibility rules.

@flyrain
Copy link
Contributor

flyrain commented Apr 19, 2022

Thanks @stevenzwu for the PR. I’m OK with the change, but I doubt if CDC can use the the interface IncrementalTableScan. Basically CDC requires much finer control of planning, check my CDC PR(#4539) for more details. We can keep evolving on the interface IncrementalTableScan to make it suitable for cdc in the future. It is hard to connect them at this moment. We may focus on the incremental scan itself in this PR.

@stevenzwu
Copy link
Contributor Author

stevenzwu commented Apr 19, 2022

@flyrain This is just a starting point. I am sure the current IncrementalScan interface is NOT good for the CDC read today, which needs more complex planning control. That was also part of the motivation when Ryan suggested. Can CDC read leverage this IncrementalScan interface in the future once enhanced? Will this direction work for CDC read?

*
* Default behavior for incremental scan fails if there are overwrite operations in the incremental snapshot range
*/
IncrementalScan ignoreOverwrites();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could add another option IncrementalScan rowKinds(RowKind... rowKinds) to support CDC case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RowKind is a Flink API. we can't use it here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we borrow the definition of data from Flink side? The incremental scan actual target that four kinds of data, right?

@@ -148,7 +89,9 @@ default TableScan select(String... columns) {
* @return a table scan which can read append data from {@code fromSnapshotId}
* exclusive and up to {@code toSnapshotId} inclusive
*/
TableScan appendsBetween(long fromSnapshotId, long toSnapshotId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should probably deprecate this because we want people to move to incremental. (Eventually)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, once new IncrementAppendScan is implemented, we can mark these two appends methods as deprecated

/**
* Only interested in snapshots with append operation
*/
IncrementalScan appendsOnly();
Copy link
Contributor

@rdblue rdblue Apr 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we talked about, I think it makes sense to remove these two methods since the default for scanning appends is to ignore deletes and overwrites and to read only append snapshots.

/**
* API for configuring an incremental table scan
*/
public interface IncrementalScan extends Scan<IncrementalScan> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IncrementalAppendScan?

*
* @return an incremental scan for appends only snapshots
*/
default IncrementalAppendScan newIncrementalAppendScan() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@flyrain @aokolnychyi @openinx @chenjunjiedada @yittg @hameizi After discussing with @rdblue, we think it is probably cleaner to have separate newScan method for appends only and CDC read.

In the future, we can add Table#newIncrementalChangelogScan and IncrementalChangelogScan interface

/**
* API for configuring an incremental table scan for appends only snapshots
*/
public interface IncrementalAppendScan extends Scan<IncrementalAppendScan> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be abstracted into IncrementalScan, because not only append-only, changelog incremental scan also needs to specify from and to snapshots. We can define the IncrementalScan interface, and then return different concrete implementations in Table through different methods, for example:

public interface IncrementalScan extends Scan<IncrementalScan> {...}

abstract class BaseIncrementalScan implements IncrementalScan {...}
public class AppendOnlyIncrementalScan extends BaseIncrementalScan {...}
public class ChangelogIncrementalScan extends BaseIncrementalScan {...}

public interface Table {
   ...
  IncrementalScan newAppendIncrementalScan();
  IncrementalScan newChangelogIncrementalScan();
  ...
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Reo-LEI conceptually I agree with you. @rdblue prefer to do the refactoring when we come to the changelog incremental scan.

The reason we didn't add BaseIncrementalScan is that right now there is no difference between BaseIncrementalScan and IncrementalAppendScan. In the future, we can extract BaseIncrementalScan out or we can have IncrementalChangelogScan extending IncrementalAppendScan. Personally, I also prefer the BaseIncrementalScan.

/**
* Optional. if from snapshot id (inclusive or exclusive) is not provided,
* the oldest ancestor of the {@link IncrementalAppendScan#toSnapshot(long)}
* will be included as the from snapshot.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Javadoc should start with a brief description of the method, then follow that with new paragraphs explaining more about the method's behavior. That's because Javadoc is going to pull out the first part as the description and the rest is available when you navigate to the method details.

IncrementalAppendScan fromSnapshotExclusive(long fromSnapshotId);

/**
* Required
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs a short Javadoc description.

* <p>
* If the start snapshot (inclusive or exclusive) is not provided,
* the oldest ancestor of the {@link IncrementalAppendScan#toSnapshot(long)}
* will be included as the start snapshot.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stevenzwu, this isn't true. If the starting snapshot is not set, then it defaults to null, which will scan from the start of table history or fail if table history has expired.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue yes, it defaults to null. The described behavior is based on IncrementalDataTableScan using SnapshotUtil.ancestorsBetween, which scan from the oldest ancestor of the toSnapshotId. I think the current behavior makes sense. Start of table history may not be an ancestor of the toSnapshotId, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the proposed behavior makes sense for this interface. Otherwise, there is no way to incrementally scan from the start of the table. If I want to start from the beginning of history, I need to specify starting snapshot null. But there's no way to do that without leaving out the "from" snapshot. If that's how to configure scanning from the start of history, then this can't scan from the oldest known snapshot by default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the oldest table snapshot is not an ancestor of the toSnapshot, what does it mean? I thought incremental scan is only meaningful along the linear ancestor line.

E.g., we have two disjointed lineages.
S1 -> S2 ----------> S5
S3 -> S4 -> S6 -> S7

If the toSnapshotId is set to S7 and fromSnapshotId is not set, I thought we want to scan [S3, S4, S6, S7]. Is that correct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is when the history has expired, not when there is no ancestor relationship. When the starting point is not an ancestor, that's a different problem that results in an exception.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make sure I understand you correctly. if fromSnapshotId is not set and defaults to null, we want start from the snapshot with the oldest timestamp using this Table API: Iterable<Snapshot> snapshots(). if the oldest snapshot is not an ancestor of the current table snapshot, we throw an exception.

I assume we don't want to use this Table API to find out the oldest snapshot (by timestamp)

List<HistoryEntry> history()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If no fromSnapshot method is called, the incremental read should start from the beginning of table history, the null snapshot. So we need to find a snapshot with parent-snapshot-id=null.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the table history has disjointed lineage lines, then we can have multiple snapshots with parent-snapshot-id=null. I guess then we can use timestamp to break the tie.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue I updated the Javadoc based on the discussion here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue can you take another look and see if the comments are addressed adequately.

@stevenzwu stevenzwu changed the title API: Introduce a new IncrementalTableScan interface API: Introduce a new IncrementalAppendScan interface Apr 29, 2022
* Refine the incremental scan with the start snapshot inclusive.
* <p>
* If the start snapshot (inclusive or exclusive) is not provided,
* the oldest snapshot will be used as the start snapshot.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this statement is clear enough. The table's first snapshot is used. You could argue that's the "oldest" but I think it is better to be clear that you're asking to process snapshots back to the start of the table. Clarifying on the next line helps, but it exposes an internal detail about how we track history: a snapshot with no parent is the starting snapshot.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you see if the latest change is more clear?

@rdblue rdblue merged commit beed94d into apache:master May 13, 2022
@stevenzwu stevenzwu deleted the refactorScanAPI branch July 26, 2022 18:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants