-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
API: Introduce a new IncrementalAppendScan interface #4580
Conversation
Here is the japi-compliance-checker report for iceberg-api jar before and after this change |
@@ -107,24 +68,6 @@ default TableScan select(String... columns) { | |||
return select(Lists.newArrayList(columns)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
couldn't move this select method to the Scan
interface, as API compatibility check tool shows that the return type of TableScan
is changed to Scan
probably due to varargs.
* | ||
* Default behavior for incremental scan fails if there are overwrite operations in the incremental snapshot range | ||
*/ | ||
IncrementalTableScan ignoreOverwrites(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For V2 tables, incremental scan is potentially interested in append, overwrite, delete operations. append is always included. I guess we need to discuss what is the natural API to control overwrite and delete.
this API of ignoreOverwrites
might be too restrictive. then we also might need to expose an includeOverwrites
. I am wondering we should api like
include(DataOperations... operations) // default is append only. for CDC read, overwrite and delete can be added
fail(DataOperations... operations) // default is fail nothing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or maybe we should just need the appendsOnly
API above. otherwise, overwrite and delete snapshots are also included automatically for V2 tables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or maybe we should just need the
appendsOnly
API above. otherwise, overwrite and delete snapshots are also included automatically for V2 tables.
I agree. Because i think for V2 table include overwrite and delete snapshots should be a default behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the logical completeness of filtering snapshot type, should we have the following 3 methods?
- ignoreAppend
- ignoreOverwrite
- ignoreDelete
And the appendsOnly
is basically a combination of ignoreOverwrite
and ignoreDelete
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree the 3 ignore methods are complete and flexible? wondering if we need the flexibility though. E.g., does append + overwrite
or overwrite + delete
ever make sense? Do we only need two modes: (1) appendsOnly (2) append + overwrite + delete).
Do we need fail APIs (like failOverwrite
)? My current take is no. Let's just skip the snapshots (not interested).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is flexible. append + overwrite
makes sense for user who only want to get inserted rows with some additional filtering. overwrite + delete
makes sense for getting only deleted rows.
I'm not aware of a use case with failOverwrite
. We may skip it now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about dataBetween(Long startSnapshotId, long endSnapshot, List<RowKind> rowKinds)
? then we could produce exact data according to the given row kinds.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chenjunjiedada RowKind
is a Flink API.
Based on the conversation, it seems most people prefer the fluent style API for the scan builder, like fromSnapshotId(long fromSnapshotId)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant we could borrow the RowKind
definition of the produced data. Like what @flyrain mentioned before. We could use +I
to target the append
+ overwrite
with some filter, use -D, -U
to target delete
and some data in overwrite
.
* @param fromSnapshotId the start snapshot id (exclusive) | ||
* @return an incremental table scan from {@code fromSnapshotId} exclusive | ||
*/ | ||
IncrementalTableScan fromSnapshotId(long fromSnapshotId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will it be more generic to add a inclusive
flag in this fromSnapshotId
method ? (So that we can meet the requirement for both including & excluding fromSnapshotId incremental scan )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree. Base on this, is there should add method like useSnapshot to process just one snapshot. I think it's useful for read iceberg table in streaming mode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually, the fromSnapshotId
is exclusive behavior (from, to] for the incremental scan, as the toSnapshotId will become the
fromSnapshotId` in the next scan.
For the inclusive behavior, we were mainly talking about the starting strategy. E.g., if we said a specific start snapshot id, we want to include the files in this snapshot (if append). To support that, we need the incremental scan to support nullable fromSnapshotId
, as we will just pass in the parent snapshot id (which can be null). That wasn't possible with the TableScan#appendsBetween(long fromSnapshotId, long toSnapshotId)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we want to add exclusive behavior, then we should add alternative methods like afterSnapshotId
. To me, fromSnapshotId
should be inclusive of the snapshot that is identified. We can come up with better names for these, like fromSnapshotInclusive
and fromSnapshotExclusive
if you like those better.
@@ -148,7 +89,9 @@ default TableScan select(String... columns) { | |||
* @return a table scan which can read append data from {@code fromSnapshotId} | |||
* exclusive and up to {@code toSnapshotId} inclusive | |||
*/ | |||
TableScan appendsBetween(long fromSnapshotId, long toSnapshotId); | |||
default TableScan appendsBetween(long fromSnapshotId, long toSnapshotId) { | |||
throw new UnsupportedOperationException("Incremental scan is not supported"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will need to keep this implementation for at least one minor release ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the apache project compatibility rules..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this change is not strictly required for this change. I am ok to revert it. I added it here as I was thinking about removing the duplicate code of UnsupportedOperationException
in many TableScan
impl classes.
We can mark those two appends
methods as deprecated once the new IncrementalScan impls are ready. Yes, we can follow the compatibility rules.
Thanks @stevenzwu for the PR. I’m OK with the change, but I doubt if CDC can use the the interface IncrementalTableScan. Basically CDC requires much finer control of planning, check my CDC PR(#4539) for more details. We can keep evolving on the interface IncrementalTableScan to make it suitable for cdc in the future. It is hard to connect them at this moment. We may focus on the incremental scan itself in this PR. |
@flyrain This is just a starting point. I am sure the current |
26c523b
to
a120456
Compare
* | ||
* Default behavior for incremental scan fails if there are overwrite operations in the incremental snapshot range | ||
*/ | ||
IncrementalScan ignoreOverwrites(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could add another option IncrementalScan rowKinds(RowKind... rowKinds)
to support CDC case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RowKind
is a Flink API. we can't use it here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we borrow the definition of data from Flink side? The incremental scan actual target that four kinds of data, right?
@@ -148,7 +89,9 @@ default TableScan select(String... columns) { | |||
* @return a table scan which can read append data from {@code fromSnapshotId} | |||
* exclusive and up to {@code toSnapshotId} inclusive | |||
*/ | |||
TableScan appendsBetween(long fromSnapshotId, long toSnapshotId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should probably deprecate this because we want people to move to incremental. (Eventually)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, once new IncrementAppendScan is implemented, we can mark these two appends methods as deprecated
/** | ||
* Only interested in snapshots with append operation | ||
*/ | ||
IncrementalScan appendsOnly(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we talked about, I think it makes sense to remove these two methods since the default for scanning appends is to ignore deletes and overwrites and to read only append snapshots.
/** | ||
* API for configuring an incremental table scan | ||
*/ | ||
public interface IncrementalScan extends Scan<IncrementalScan> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IncrementalAppendScan
?
* | ||
* @return an incremental scan for appends only snapshots | ||
*/ | ||
default IncrementalAppendScan newIncrementalAppendScan() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@flyrain @aokolnychyi @openinx @chenjunjiedada @yittg @hameizi After discussing with @rdblue, we think it is probably cleaner to have separate newScan method for appends only and CDC read.
In the future, we can add Table#newIncrementalChangelogScan
and IncrementalChangelogScan
interface
api/src/main/java/org/apache/iceberg/IncrementalAppendScan.java
Outdated
Show resolved
Hide resolved
/** | ||
* API for configuring an incremental table scan for appends only snapshots | ||
*/ | ||
public interface IncrementalAppendScan extends Scan<IncrementalAppendScan> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can be abstracted into IncrementalScan
, because not only append-only, changelog incremental scan also needs to specify from
and to
snapshots. We can define the IncrementalScan
interface, and then return different concrete implementations in Table
through different methods, for example:
public interface IncrementalScan extends Scan<IncrementalScan> {...}
abstract class BaseIncrementalScan implements IncrementalScan {...}
public class AppendOnlyIncrementalScan extends BaseIncrementalScan {...}
public class ChangelogIncrementalScan extends BaseIncrementalScan {...}
public interface Table {
...
IncrementalScan newAppendIncrementalScan();
IncrementalScan newChangelogIncrementalScan();
...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Reo-LEI conceptually I agree with you. @rdblue prefer to do the refactoring when we come to the changelog incremental scan.
The reason we didn't add BaseIncrementalScan
is that right now there is no difference between BaseIncrementalScan
and IncrementalAppendScan
. In the future, we can extract BaseIncrementalScan
out or we can have IncrementalChangelogScan
extending IncrementalAppendScan
. Personally, I also prefer the BaseIncrementalScan
.
/** | ||
* Optional. if from snapshot id (inclusive or exclusive) is not provided, | ||
* the oldest ancestor of the {@link IncrementalAppendScan#toSnapshot(long)} | ||
* will be included as the from snapshot. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Javadoc should start with a brief description of the method, then follow that with new paragraphs explaining more about the method's behavior. That's because Javadoc is going to pull out the first part as the description and the rest is available when you navigate to the method details.
IncrementalAppendScan fromSnapshotExclusive(long fromSnapshotId); | ||
|
||
/** | ||
* Required |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this needs a short Javadoc description.
* <p> | ||
* If the start snapshot (inclusive or exclusive) is not provided, | ||
* the oldest ancestor of the {@link IncrementalAppendScan#toSnapshot(long)} | ||
* will be included as the start snapshot. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stevenzwu, this isn't true. If the starting snapshot is not set, then it defaults to null
, which will scan from the start of table history or fail if table history has expired.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue yes, it defaults to null
. The described behavior is based on IncrementalDataTableScan
using SnapshotUtil.ancestorsBetween
, which scan from the oldest ancestor of the toSnapshotId
. I think the current behavior makes sense. Start of table history may not be an ancestor of the toSnapshotId
, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the proposed behavior makes sense for this interface. Otherwise, there is no way to incrementally scan from the start of the table. If I want to start from the beginning of history, I need to specify starting snapshot null
. But there's no way to do that without leaving out the "from" snapshot. If that's how to configure scanning from the start of history, then this can't scan from the oldest known snapshot by default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the oldest table snapshot is not an ancestor of the toSnapshot
, what does it mean? I thought incremental scan is only meaningful along the linear ancestor line.
E.g., we have two disjointed lineages.
S1 -> S2 ----------> S5
S3 -> S4 -> S6 -> S7
If the toSnapshotId
is set to S7
and fromSnapshotId
is not set, I thought we want to scan [S3, S4, S6, S7]. Is that correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is when the history has expired, not when there is no ancestor relationship. When the starting point is not an ancestor, that's a different problem that results in an exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to make sure I understand you correctly. if fromSnapshotId
is not set and defaults to null, we want start from the snapshot with the oldest timestamp using this Table
API: Iterable<Snapshot> snapshots()
. if the oldest snapshot is not an ancestor of the current table snapshot, we throw an exception.
I assume we don't want to use this Table
API to find out the oldest snapshot (by timestamp)
List<HistoryEntry> history()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If no fromSnapshot
method is called, the incremental read should start from the beginning of table history, the null
snapshot. So we need to find a snapshot with parent-snapshot-id=null
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the table history has disjointed lineage lines, then we can have multiple snapshots with parent-snapshot-id=null
. I guess then we can use timestamp to break the tie.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue I updated the Javadoc based on the discussion here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue can you take another look and see if the comments are addressed adequately.
* Refine the incremental scan with the start snapshot inclusive. | ||
* <p> | ||
* If the start snapshot (inclusive or exclusive) is not provided, | ||
* the oldest snapshot will be used as the start snapshot. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this statement is clear enough. The table's first snapshot is used. You could argue that's the "oldest" but I think it is better to be clear that you're asking to process snapshots back to the start of the table. Clarifying on the next line helps, but it exposes an internal detail about how we track history: a snapshot with no parent is the starting snapshot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you see if the latest change is more clear?
4463382
to
c65b89b
Compare
During review of Flink FLIP-27 source PR #4329 , we agreed that the streaming start strategy should be inclusive. Hence we would need the
TableScan#appendsBetween
to support nullablefromSnapshotId
. Right now,fromSnapshotId
is a primitive type oflong
.#4329 (comment)
Initially, @rdblue and I were thinking just overload
appendsBetween
with aLong fromSnapshotId
. That would cause compiling error due to ambiguity of type resolution. As we want to maintain binary backward compatibility, I tried to add a new method nameappendsInRange
in PR #4529 , which is not as an intuitive name asappendsBetween
.This is a different direction with PR #4529 that is suggested by @rdblue. Instead of modifying
TableScan
interface, maybe we can consider introduce a newIncrementalTableScan
interface andTable#newIncrementalScan
method.TableScan
andIncrementalTableScan
. This also help avoids the need ofUnsupportedOperationException
for someTableScan
methodsIncrementalTableScan
APITo avoid code duplication, a new super interface of
Scan<T extends Scan>
was extracted as the parent of the currentTableScan
and the newIncrementalTableScan
. I ranjapi-compliance-checker
too check binary compatibility after the interface refactoring. Will attach the result