Skip to content

Commit

Permalink
Spark 3.4: Output the net changes across snapshots for carryover rows…
Browse files Browse the repository at this point in the history
… in CDC (#7950)
  • Loading branch information
flyrain authored Jul 1, 2023
1 parent 0c37a97 commit f5346ca
Show file tree
Hide file tree
Showing 7 changed files with 456 additions and 147 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.iceberg.spark.extensions;

import static org.junit.Assert.assertThrows;

import java.util.List;
import java.util.Map;
import org.apache.iceberg.ChangelogOperation;
Expand Down Expand Up @@ -45,13 +47,13 @@ public void removeTable() {
sql("DROP TABLE IF EXISTS %s", tableName);
}

public void createTableWith2Columns() {
public void createTableWithTwoColumns() {
sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version'='%d')", tableName, 1);
sql("ALTER TABLE %s ADD PARTITION FIELD data", tableName);
}

private void createTableWith3Columns() {
private void createTableWithThreeColumns() {
sql("CREATE TABLE %s (id INT, data STRING, age INT) USING iceberg", tableName);
sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version'='%d')", tableName, 1);
sql("ALTER TABLE %s ADD PARTITION FIELD id", tableName);
Expand All @@ -65,7 +67,7 @@ private void createTableWithIdentifierField() {

@Test
public void testCustomizedViewName() {
createTableWith2Columns();
createTableWithTwoColumns();
sql("INSERT INTO %s VALUES (1, 'a')", tableName);
sql("INSERT INTO %s VALUES (2, 'b')", tableName);

Expand Down Expand Up @@ -98,7 +100,7 @@ public void testCustomizedViewName() {

@Test
public void testNoSnapshotIdInput() {
createTableWith2Columns();
createTableWithTwoColumns();
sql("INSERT INTO %s VALUES (1, 'a')", tableName);
Table table = validationCatalog.loadTable(tableIdent);
Snapshot snap0 = table.currentSnapshot();
Expand Down Expand Up @@ -129,7 +131,7 @@ public void testNoSnapshotIdInput() {

@Test
public void testTimestampsBasedQuery() {
createTableWith2Columns();
createTableWithTwoColumns();
long beginning = System.currentTimeMillis();

sql("INSERT INTO %s VALUES (1, 'a')", tableName);
Expand Down Expand Up @@ -189,7 +191,7 @@ public void testTimestampsBasedQuery() {

@Test
public void testWithCarryovers() {
createTableWith2Columns();
createTableWithTwoColumns();
sql("INSERT INTO %s VALUES (1, 'a')", tableName);
Table table = validationCatalog.loadTable(tableIdent);
Snapshot snap0 = table.currentSnapshot();
Expand Down Expand Up @@ -224,7 +226,7 @@ public void testWithCarryovers() {

@Test
public void testUpdate() {
createTableWith2Columns();
createTableWithTwoColumns();
sql("ALTER TABLE %s DROP PARTITION FIELD data", tableName);
sql("ALTER TABLE %s ADD PARTITION FIELD id", tableName);

Expand Down Expand Up @@ -283,7 +285,7 @@ public void testUpdateWithIdentifierField() {

@Test
public void testUpdateWithFilter() {
createTableWith2Columns();
createTableWithTwoColumns();
sql("ALTER TABLE %s DROP PARTITION FIELD data", tableName);
sql("ALTER TABLE %s ADD PARTITION FIELD id", tableName);

Expand Down Expand Up @@ -315,7 +317,7 @@ public void testUpdateWithFilter() {

@Test
public void testUpdateWithMultipleIdentifierColumns() {
createTableWith3Columns();
createTableWithThreeColumns();

sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11)", tableName);
Table table = validationCatalog.loadTable(tableIdent);
Expand Down Expand Up @@ -347,7 +349,7 @@ public void testUpdateWithMultipleIdentifierColumns() {

@Test
public void testRemoveCarryOvers() {
createTableWith3Columns();
createTableWithThreeColumns();

sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11), (2, 'e', 12)", tableName);
Table table = validationCatalog.loadTable(tableIdent);
Expand Down Expand Up @@ -381,7 +383,7 @@ public void testRemoveCarryOvers() {

@Test
public void testRemoveCarryOversWithoutUpdatedRows() {
createTableWith3Columns();
createTableWithThreeColumns();

sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11), (2, 'e', 12)", tableName);
Table table = validationCatalog.loadTable(tableIdent);
Expand Down Expand Up @@ -411,9 +413,74 @@ public void testRemoveCarryOversWithoutUpdatedRows() {
sql("select * from %s order by _change_ordinal, id, data", viewName));
}

@Test
public void testNetChangesWithRemoveCarryOvers() {
// partitioned by id
createTableWithThreeColumns();

// insert rows: (1, 'a', 12) (2, 'b', 11) (2, 'e', 12)
sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11), (2, 'e', 12)", tableName);
Table table = validationCatalog.loadTable(tableIdent);
Snapshot snap1 = table.currentSnapshot();

// delete rows: (2, 'b', 11) (2, 'e', 12)
// insert rows: (3, 'c', 13) (2, 'd', 11) (2, 'e', 12)
sql("INSERT OVERWRITE %s VALUES (3, 'c', 13), (2, 'd', 11), (2, 'e', 12)", tableName);
table.refresh();
Snapshot snap2 = table.currentSnapshot();

// delete rows: (2, 'd', 11) (2, 'e', 12) (3, 'c', 13)
// insert rows: (3, 'c', 15) (2, 'e', 12)
sql("INSERT OVERWRITE %s VALUES (3, 'c', 15), (2, 'e', 12)", tableName);
table.refresh();
Snapshot snap3 = table.currentSnapshot();

// test with all snapshots
List<Object[]> returns =
sql(
"CALL %s.system.create_changelog_view(table => '%s', net_changes => true)",
catalogName, tableName);

String viewName = (String) returns.get(0)[0];

assertEquals(
"Rows should match",
ImmutableList.of(
row(1, "a", 12, INSERT, 0, snap1.snapshotId()),
row(3, "c", 15, INSERT, 2, snap3.snapshotId()),
row(2, "e", 12, INSERT, 2, snap3.snapshotId())),
sql("select * from %s order by _change_ordinal, data", viewName));

// test with snap2 and snap3
sql(
"CALL %s.system.create_changelog_view(table => '%s', "
+ "options => map('start-snapshot-id','%s'), "
+ "net_changes => true)",
catalogName, tableName, snap1.snapshotId());

assertEquals(
"Rows should match",
ImmutableList.of(
row(2, "b", 11, DELETE, 0, snap2.snapshotId()),
row(3, "c", 15, INSERT, 1, snap3.snapshotId())),
sql("select * from %s order by _change_ordinal, data", viewName));
}

@Test
public void testNetChangesWithComputeUpdates() {
createTableWithTwoColumns();
assertThrows(
"Should fail because net_changes is not supported with computing updates",
IllegalArgumentException.class,
() ->
sql(
"CALL %s.system.create_changelog_view(table => '%s', identifier_columns => array('id'), net_changes => true)",
catalogName, tableName));
}

@Test
public void testNotRemoveCarryOvers() {
createTableWith3Columns();
createTableWithThreeColumns();

sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11), (2, 'e', 12)", tableName);
Table table = validationCatalog.loadTable(tableIdent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@

import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
import org.apache.iceberg.ChangelogOperation;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.StructType;
Expand All @@ -35,16 +37,28 @@ public abstract class ChangelogIterator implements Iterator<Row> {

private final Iterator<Row> rowIterator;
private final int changeTypeIndex;
private final StructType rowType;

protected ChangelogIterator(Iterator<Row> rowIterator, StructType rowType) {
this.rowIterator = rowIterator;
this.rowType = rowType;
this.changeTypeIndex = rowType.fieldIndex(MetadataColumns.CHANGE_TYPE.name());
}

protected int changeTypeIndex() {
return changeTypeIndex;
}

protected StructType rowType() {
return rowType;
}

protected String changeType(Row row) {
String changeType = row.getString(changeTypeIndex());
Preconditions.checkNotNull(changeType, "Change type should not be null");
return changeType;
}

protected Iterator<Row> rowIterator() {
return rowIterator;
}
Expand Down Expand Up @@ -79,7 +93,35 @@ public static Iterator<Row> removeCarryovers(Iterator<Row> rowIterator, StructTy
return Iterators.filter(changelogIterator, Objects::nonNull);
}

public static Iterator<Row> removeNetCarryovers(Iterator<Row> rowIterator, StructType rowType) {
ChangelogIterator changelogIterator = new RemoveNetCarryoverIterator(rowIterator, rowType);
return Iterators.filter(changelogIterator, Objects::nonNull);
}

protected boolean isSameRecord(Row currentRow, Row nextRow, int[] indicesToIdentifySameRow) {
for (int idx : indicesToIdentifySameRow) {
if (isDifferentValue(currentRow, nextRow, idx)) {
return false;
}
}

return true;
}

protected boolean isDifferentValue(Row currentRow, Row nextRow, int idx) {
return !Objects.equals(nextRow.get(idx), currentRow.get(idx));
}

protected static int[] generateIndicesToIdentifySameRow(
int totalColumnCount, Set<Integer> metadataColumnIndices) {
int[] indices = new int[totalColumnCount - metadataColumnIndices.size()];

for (int i = 0, j = 0; i < indices.length; i++) {
if (!metadataColumnIndices.contains(i)) {
indices[j] = i;
j++;
}
}
return indices;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,13 @@ public Row next() {
// either a cached record which is not an UPDATE or the next record in the iterator.
Row currentRow = currentRow();

if (currentRow.getString(changeTypeIndex()).equals(DELETE) && rowIterator().hasNext()) {
if (changeType(currentRow).equals(DELETE) && rowIterator().hasNext()) {
Row nextRow = rowIterator().next();
cachedRow = nextRow;

if (sameLogicalRow(currentRow, nextRow)) {
String nextRowChangeType = nextRow.getString(changeTypeIndex());

Preconditions.checkState(
nextRowChangeType.equals(INSERT),
changeType(nextRow).equals(INSERT),
"Cannot compute updates because there are multiple rows with the same identifier"
+ " fields([%s]). Please make sure the rows are unique.",
String.join(",", identifierFields));
Expand Down Expand Up @@ -118,7 +116,7 @@ private Row modify(Row row, int valueIndex, Object value) {
}

private boolean cachedUpdateRecord() {
return cachedRow != null && cachedRow.getString(changeTypeIndex()).equals(UPDATE_AFTER);
return cachedRow != null && changeType(cachedRow).equals(UPDATE_AFTER);
}

private Row currentRow() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.iceberg.spark;

import java.util.Iterator;
import java.util.Set;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.StructType;

Expand Down Expand Up @@ -55,7 +57,7 @@ class RemoveCarryoverIterator extends ChangelogIterator {

RemoveCarryoverIterator(Iterator<Row> rowIterator, StructType rowType) {
super(rowIterator, rowType);
this.indicesToIdentifySameRow = generateIndicesToIdentifySameRow(rowType.size());
this.indicesToIdentifySameRow = generateIndicesToIdentifySameRow();
}

@Override
Expand Down Expand Up @@ -88,7 +90,7 @@ public Row next() {
}

// If the current row is a delete row, drain all identical delete rows
if (currentRow.getString(changeTypeIndex()).equals(DELETE) && rowIterator().hasNext()) {
if (changeType(currentRow).equals(DELETE) && rowIterator().hasNext()) {
cachedDeletedRow = currentRow;
deletedRowCount = 1;

Expand All @@ -98,8 +100,8 @@ public Row next() {
// row is the same record
while (nextRow != null
&& cachedDeletedRow != null
&& isSameRecord(cachedDeletedRow, nextRow)) {
if (nextRow.getString(changeTypeIndex()).equals(INSERT)) {
&& isSameRecord(cachedDeletedRow, nextRow, indicesToIdentifySameRow)) {
if (changeType(nextRow).equals(INSERT)) {
deletedRowCount--;
if (deletedRowCount == 0) {
cachedDeletedRow = null;
Expand Down Expand Up @@ -139,25 +141,8 @@ private boolean hasCachedDeleteRow() {
return cachedDeletedRow != null;
}

private int[] generateIndicesToIdentifySameRow(int columnSize) {
int[] indices = new int[columnSize - 1];
for (int i = 0; i < indices.length; i++) {
if (i < changeTypeIndex()) {
indices[i] = i;
} else {
indices[i] = i + 1;
}
}
return indices;
}

private boolean isSameRecord(Row currentRow, Row nextRow) {
for (int idx : indicesToIdentifySameRow) {
if (isDifferentValue(currentRow, nextRow, idx)) {
return false;
}
}

return true;
private int[] generateIndicesToIdentifySameRow() {
Set<Integer> metadataColumnIndices = Sets.newHashSet(changeTypeIndex());
return generateIndicesToIdentifySameRow(rowType().size(), metadataColumnIndices);
}
}
Loading

0 comments on commit f5346ca

Please sign in to comment.