Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MR: apply row-level delete files when reading #1497

Merged

Conversation

chenjunjiedada
Copy link
Collaborator

@chenjunjiedada chenjunjiedada commented Sep 23, 2020

This applies row-level delete files when reading for IcebergInputFormat. This also includes changes from #985.

This also refactors the deletes read unit tests to a separated base test class to avoid duplication.

@@ -148,15 +144,15 @@ public void testMixedPositionAndEqualityDeletes() throws IOException {
);

DeleteFile eqDeletes = FileHelpers.writeDeleteFile(
table, Files.localOutput(temp.newFile()), Row.of(0), dataDeletes, dataSchema);
table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), dataDeletes, dataSchema);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you import this class directly to avoid so many changes in this file?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -269,4 +265,5 @@ private StructLikeSet rowSetWithoutIds(int... idsToRemove) {
.forEach(set::add);
return set;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: unnecessary whitespace change

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

@@ -129,7 +133,7 @@
// TODO: We do not support residual evaluation for HIVE and PIG in memory data model yet
checkResiduals(task);
}
splits.add(new IcebergSplit(conf, task));
splits.add(new IcebergSplit(conf, task, table.io(), table.encryption()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I would like to get the encryption manager and io changes in, I don't think that they should be mixed into this commit. Was it necessary to do this for some reason?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The GenericDeleteFilter constructor needs FileIO as parameter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, this can pass the FileIO somehow, or we can work on getting the other PR done before this one. But I don't think we should mix the two features together.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, this could create a new HadoopFileIO and use that instead. That would be the easiest path forward.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@holdenk just added a FileIO instance to this class, so you can use that instead of mixing the two PRs together. Thanks @holdenk!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @holdenk @rdblue. I updated this.

public static final Schema SCHEMA = new Schema(
required(1, "id", Types.IntegerType.get()),
required(2, "data", Types.StringType.get())
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not put the schema and spec in the parent class, DeletesReadTest? The data it generates is for this schema.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

@@ -92,6 +72,22 @@ public void testEqualityDeletes() throws IOException {
Assert.assertEquals("Table should contain expected rows", expected, actual);
}

protected void generateTestData() throws IOException {
this.records = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We prefer using Lists.newArrayList()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

public void writeTestDataFile() throws IOException {
File tableDir = temp.newFolder();
tableDir.delete();
this.table = TestTables.create(tableDir, "test", SCHEMA, SPEC, 2);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a better way to break down the class would be to have an abstract Table createTable(String name, Schema, Spec) method. Then the table and dataFile fields don't need to be shared. I also don't think that there is a need to make records public either.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense to me. Updated.

@@ -248,6 +258,26 @@ public void close() throws IOException {
return iterable;
}

@SuppressWarnings("unchecked")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deletes.filter(...) needs this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, makes sense.

case GENERIC:
DeleteFilter deletes = new GenericDeleteFilter(io, currentTask, tableSchema, readSchema);
Schema requiredSchema = deletes.requiredSchema();
iter = deletes.filter(openTask(currentTask, requiredSchema));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not return deletes.filter(...) here? That would remove the need for iter and break.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

}
}

return parameters;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a simpler way to configure this? Normally, we build these using literals instead of a block of code.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I just updated it.

return rowSet(table, "*");
}

private static StructLikeSet rowSet(Table table, String... columns) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is what reads the rows from the table using Spark. Deleting this method and using the one in DeletesReadTest makes this test suite use the exact same read path as the generics -- IcebergGenerics.

You can probably make this method abstract and implement it in both classes to get around this. You'll also need to implement a read using the input format or Hive runner to test the Hive code.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I missed this. I just added this back and also use input format to read records.

@chenjunjiedada chenjunjiedada force-pushed the apply-row-level-deletes-when-reading branch from 8faeac6 to 84310d9 Compare September 24, 2020 03:30
Copy link
Contributor

@rdsr rdsr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!. The changes in MR to incorporate deletes are minimal and clean!

@chenjunjiedada chenjunjiedada force-pushed the apply-row-level-deletes-when-reading branch from 84310d9 to e1102d7 Compare September 29, 2020 06:50
@@ -370,7 +370,7 @@ public void testCustomCatalog() throws IOException {
testInputFormat.create(builder.conf()).validate(expectedRecords);
}

private abstract static class TestInputFormat<T> {
public abstract static class TestInputFormat<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this changed to public?

Copy link
Collaborator Author

@chenjunjiedada chenjunjiedada Sep 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because the testInputFormat is type of TestIcebergInputFormats.TestInputFormat.Factory<Record> which needs to access the TestInputFormat

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to minimize the changes to access modifiers in TestIcebergInputFormat, the last place that needs the modifier of TestInputFormat to be public is getRecords method.

@chenjunjiedada chenjunjiedada force-pushed the apply-row-level-deletes-when-reading branch 2 times, most recently from 0a952f9 to f5d382d Compare September 30, 2020 00:06
@@ -63,12 +82,20 @@ public void writeTestDataFile() throws IOException {
records.add(record.copy("id", 122, "data", "g"));

this.dataFile = FileHelpers.writeDataFile(table, Files.localOutput(temp.newFile()), Row.of(0), records);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: whitespace-only change.


@Before
public void writeTestDataFile() throws IOException {
public void prepareData() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: unnecessary method renames cause more changed lines than needed.

private List<Record> records = null;
private DataFile dataFile = null;
private DataFile dataFile;
private List<Record> records;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing the order of these two lines and dropping the default also causes unnecessary changes.

.bucket("data", 16)
.build();

protected final String testTableName = "test";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be private. If it is needed by subclases, it should be passed into methods, not shared. I think this is only used by Spark, so it should be easy to fix.

@rdblue
Copy link
Contributor

rdblue commented Oct 2, 2020

@chenjunjiedada, I went ahead and fixed the remaining issues and opened a PR against your branch. Could you please take a look?

@rdblue rdblue merged commit 118cc80 into apache:master Oct 5, 2020
@rdblue
Copy link
Contributor

rdblue commented Oct 5, 2020

Merged. Thanks, @chenjunjiedada!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants