Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API: Add a scan for changes #4870

Merged
merged 2 commits into from
Jul 3, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
API: Add IncrementalChangelogScan
  • Loading branch information
aokolnychyi committed Jun 28, 2022
commit e8b11009b5f80deff0481eda76098ad25d1aac40
42 changes: 42 additions & 0 deletions api/src/main/java/org/apache/iceberg/AddedRowsScanTask.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg;

import java.util.List;

/**
* A scan task for inserted data records generated by adding a data file to the table.
* <p>
* Note that added files may have matching delete files. If so, such delete files will be included
* in this task and must be applied while reading the data file.
*/
public interface AddedRowsScanTask extends ChangelogScanTask, ContentScanTask<DataFile> {
/**
* A list of {@link DeleteFile delete files} to apply when reading the task's data file.
*
* @return a list of delete files to apply
*/
List<DeleteFile> deletes();

@Override
default ChangelogOperation operation() {
return ChangelogOperation.INSERT;
}
}
27 changes: 27 additions & 0 deletions api/src/main/java/org/apache/iceberg/ChangelogOperation.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg;

/**
* An enum representing possible operations in a changelog.
*/
public enum ChangelogOperation {
INSERT, DELETE
}
40 changes: 40 additions & 0 deletions api/src/main/java/org/apache/iceberg/ChangelogScanTask.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg;

/**
* A changelog scan task.
*/
public interface ChangelogScanTask extends ScanTask {
/**
* Returns the operation type of records produced by this task (i.e. insert/delete).
*/
ChangelogOperation operation();

/**
* Returns the relative change order in which the changes must be applied.
*/
int changeOrder();

/**
* Returns the snapshot ID in which the changes were committed.
*/
long commitSnapshotId();
Copy link
Contributor

@flyrain flyrain Jun 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we restrict this scan task for one snapshot? I feel like itself and its subclass can be used for a scan across multiple snapshots, which is useful, for example, to scan position deletes across snapshots. In that case, commitSnapshotId is going to be optional.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we scan for changes across snapshots, I think we should output all changes per snapshot by default. If I understand you correctly, you want to support use cases when the user is only interested in net changes across snapshots. I think that's possible too but the records produced by a task would still have some snapshot ID when they were committed.

For instance, we add data file A in S1. Then we add a delete file D in S2 that matches data file A. In this case, we would output AddedDataFileScanTask with commit snapshot S1 but it will have a delete file D associated with it so that we will only output records that remain there in S2.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will commitSnapshotId() return in that case? s1 or s2, or a list of snapshot ids? I'd think the snapshot ids may not be useful if we want to calculate the net changes across snapshots. For example, IncrementalAppendScan could be considered as one of cases that outputs scan tasks across snapshots, which doesn't need the snapshot id.

One solution is to add another layer of interface to support the net changes across snapshots. For example, we can use ChangelogScanTask as a parent for both cases of one snapshot and multiple snapshots, and then create a new interface SnapshotChangelogScanTask to extend it for use case of a single snapshot.

I think the current solution is good enough for CDC implementation. But, since it is an interface change, we may want to make sure it is extendable for the future use cases. If it is something we will do to support net changes across snapshots, we may choose the right the right hierarchy and names now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After thinking more about this, I agree that commit snapshot ID doesn't make sense for net changes across multiple snapshots in some cases. For instance, when we have 2 snapshots that add delete files for the same data file. In this case, we will have a single task with one data file and two deletes added in multiple snapshots.

Having said that, I am still not sure having separate tasks is a good idea. Apart from more tasks to support in readers, what schema to produce? Our changelog metadata table should have the same schema independently whether we include only net changes or all changes per each snapshot.

We can make the commit snapshot ID null when only net changes are produced, skip the commit snapshot ID in both cases or offer some sort of a map of metadata where the snapshot ID may or may not be set.

Let me think more about this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's OK to leave commitSnapshotId and commitOrder null in case of net changes. I assume the intent to have method commitSnapshotId and commitOrder is to enable reader project them.

ID Name Age _record_type _commit_snapshot_id _commit_timestamp _commit_order
1 Amy 21 Insert 8563478696506531065 2021-01-23 04:30:45 34
Considering the cdc record format above, does reader need `commitTimestamp` as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@flyrain @rdblue, I agree with you but what about DeletedRowsScanTask?

Suppose I have a data file file_A:

id
--
1
2
3
4
5

And then I have two delete snapshots S1 and S2. S1 deletes IDs 1, 2 and S2 deletes IDs 3, 4. If we are to assign snapshot IDs to tasks, we will have to produce two DeletedRowsScanTasks:

file_A & deletes from S1 (inherit the snapshot ID and commit order from S1)
file_A & deletes from S2 (inherit the snapshot ID and commit order from S2)

Even if we combine these two tasks in one group, it will be suboptimal as we will scan through the same data file twice. It would be more efficient to combine these deletes against the same data file and do one pass to determine deleted records.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aokolnychyi, I'd say that's a special form of combining. We could introduce a CombinedDeletedRowsScanTask that takes care of the snapshot handling.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We won't be able to extend ChangelogScanTask in CombinedDeletedRowsScanTask, though? Unless we remove the commit snapshot ID from ChangelogScanTask?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point, but I'm thinking that we would just throw an exception if it were called. I'd also be up for returning null and passing each sub-task's ID and ordinal separately.

Copy link
Contributor Author

@aokolnychyi aokolnychyi Jul 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An exception sounds reasonable. Using nulls in a public API is also not the best idea.

}
42 changes: 42 additions & 0 deletions api/src/main/java/org/apache/iceberg/DeletedDataFileScanTask.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg;

import java.util.List;

/**
* A scan task for deleted data records generated by removing a data file from the table.
* <p>
* Note that all historical delete files added before must be applied while reading the data file to
* output only those data records that were live when the data file was removed.
*/
public interface DeletedDataFileScanTask extends ChangelogScanTask, ContentScanTask<DataFile> {
/**
* A list of {@link DeleteFile delete files} to apply when reading the task's data file.
*
* @return a list of delete files to apply
*/
List<DeleteFile> deletes();

@Override
default ChangelogOperation operation() {
return ChangelogOperation.DELETE;
}
}
49 changes: 49 additions & 0 deletions api/src/main/java/org/apache/iceberg/DeletedRowsScanTask.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg;

import java.util.List;

/**
* A scan task for deleted data records generated by adding delete files to the table.
*/
public interface DeletedRowsScanTask extends ChangelogScanTask, ContentScanTask<DataFile> {
/**
* A list of added {@link DeleteFile delete files} that apply to the task's data file.
* Records removed by these delete files should appear as deletes in the changelog.
*
* @return a list of added delete files
*/
List<DeleteFile> addedDeletes();

/**
* A list of {@link DeleteFile delete files} that existed before and must be applied prior to
* determining which records are deleted by delete files in {@link #addedDeletes()}.
* Records removed by these delete files should not appear in the changelog.
*
* @return a list of existing delete files
*/
List<DeleteFile> existingDeletes();

@Override
default ChangelogOperation operation() {
return ChangelogOperation.DELETE;
}
}
38 changes: 1 addition & 37 deletions api/src/main/java/org/apache/iceberg/IncrementalAppendScan.java
Original file line number Diff line number Diff line change
@@ -23,41 +23,5 @@
/**
* API for configuring an incremental table scan for appends only snapshots
*/
public interface IncrementalAppendScan extends Scan<IncrementalAppendScan, FileScanTask, CombinedScanTask> {

/**
* Refine the incremental scan with the start snapshot inclusive.
* <p>
* If neither {@link #fromSnapshotInclusive(long)} or {@link #fromSnapshotExclusive(long)} is provided,
* start snapshot inclusive is defaulted to the oldest ancestor of the end snapshot.
*
* @param fromSnapshotId the start snapshot id inclusive
* @return an incremental table scan from {@code fromSnapshotId} inclusive
* @throws IllegalArgumentException if the start snapshot is not an ancestor
* of the end snapshot
*/
IncrementalAppendScan fromSnapshotInclusive(long fromSnapshotId);

/**
* Refine the incremental scan with the start snapshot exclusive.
* <p>
* If neither {@link #fromSnapshotInclusive(long)} or {@link #fromSnapshotExclusive(long)} is provided,
* start snapshot inclusive is defaulted to the oldest ancestor of the end snapshot.
*
* @param fromSnapshotId the start snapshot id (exclusive)
* @return an incremental table scan from {@code fromSnapshotId} exclusive
* @throws IllegalArgumentException if the start snapshot is not an ancestor
* of the end snapshot
*/
IncrementalAppendScan fromSnapshotExclusive(long fromSnapshotId);

/**
* Refine the incremental scan with the end snapshot inclusive.
* <p>
* If not provided, end snapshot is defaulted to the current table snapshot.
*
* @param toSnapshotId the end snapshot id (inclusive)
* @return an incremental table scan up to {@code toSnapshotId} inclusive
*/
IncrementalAppendScan toSnapshot(long toSnapshotId);
public interface IncrementalAppendScan extends IncrementalScan<IncrementalAppendScan, FileScanTask, CombinedScanTask> {
}
27 changes: 27 additions & 0 deletions api/src/main/java/org/apache/iceberg/IncrementalChangelogScan.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg;

/**
* API for configuring a scan for table changes.
*/
public interface IncrementalChangelogScan
extends IncrementalScan<IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>> {
}
59 changes: 59 additions & 0 deletions api/src/main/java/org/apache/iceberg/IncrementalScan.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg;

/**
* API for configuring an incremental scan.
*/
public interface IncrementalScan<ThisT, T extends ScanTask, G extends ScanTaskGroup<T>> extends Scan<ThisT, T, G> {
/**
* Refines this scan with a start snapshot ID (inclusive).
* <p>
* If the start snapshot is not configured, it is defaulted to the oldest ancestor
* of the end snapshot (inclusive).
*
* @param fromSnapshotId the start snapshot ID (inclusive)
* @return this for method chaining
* @throws IllegalArgumentException if the start snapshot is not an ancestor of the end snapshot
*/
ThisT fromSnapshotInclusive(long fromSnapshotId);

/**
* Refines this scan with a start snapshot ID (exclusive).
* <p>
* If the start snapshot is not configured, it is defaulted to the oldest ancestor
* of the end snapshot (inclusive).
*
* @param fromSnapshotId the start snapshot ID (exclusive)
* @return this for method chaining
* @throws IllegalArgumentException if the start snapshot is not an ancestor of the end snapshot
*/
ThisT fromSnapshotExclusive(long fromSnapshotId);

/**
* Instructs this scan to look for changes up to a particular snapshot (inclusive).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is about setting the "end" snapshot ID correct? We should probably note that here since that vocab is used in the previous snapshots.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I aligned methods above.

* <p>
* If the end snapshot is not configured, it is defaulted to the current table snapshot (inclusive).
*
* @param toSnapshotId the end snapshot ID (inclusive)
* @return this for method chaining
*/
ThisT toSnapshot(long toSnapshotId);
}
11 changes: 11 additions & 0 deletions api/src/main/java/org/apache/iceberg/Table.java
Original file line number Diff line number Diff line change
@@ -64,6 +64,17 @@ default IncrementalAppendScan newIncrementalAppendScan() {
throw new UnsupportedOperationException("Incremental append scan is not supported");
}

/**
* Create a new {@link IncrementalChangelogScan} for this table.
* <p>
* Once a scan is created, it can be refined to project columns and filter data.
*
* @return an incremental changelog scan
*/
default IncrementalChangelogScan newIncrementalChangelogScan() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want changelog or simply change?

Copy link
Contributor Author

@aokolnychyi aokolnychyi Jun 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stevenzwu, how do you feel about the names? You know more about CDC use cases. I think I'd slightly prefer shorter names like ChangeScan and ChangeScanTask but if changelog is really widely used, I have no problem using it as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems changelog is pretty popular. Let's keep it as-is.

throw new UnsupportedOperationException("Incremental changelog scan is not supported");
}

/**
* Return the {@link Schema schema} for this table.
*