-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MR: apply row-level delete files when reading #1497
MR: apply row-level delete files when reading #1497
Conversation
@@ -148,15 +144,15 @@ public void testMixedPositionAndEqualityDeletes() throws IOException { | |||
); | |||
|
|||
DeleteFile eqDeletes = FileHelpers.writeDeleteFile( | |||
table, Files.localOutput(temp.newFile()), Row.of(0), dataDeletes, dataSchema); | |||
table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), dataDeletes, dataSchema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you import this class directly to avoid so many changes in this file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -269,4 +265,5 @@ private StructLikeSet rowSetWithoutIds(int... idsToRemove) { | |||
.forEach(set::add); | |||
return set; | |||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: unnecessary whitespace change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
@@ -129,7 +133,7 @@ | |||
// TODO: We do not support residual evaluation for HIVE and PIG in memory data model yet | |||
checkResiduals(task); | |||
} | |||
splits.add(new IcebergSplit(conf, task)); | |||
splits.add(new IcebergSplit(conf, task, table.io(), table.encryption())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While I would like to get the encryption manager and io changes in, I don't think that they should be mixed into this commit. Was it necessary to do this for some reason?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The GenericDeleteFilter
constructor needs FileIO
as parameter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case, this can pass the FileIO
somehow, or we can work on getting the other PR done before this one. But I don't think we should mix the two features together.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now, this could create a new HadoopFileIO
and use that instead. That would be the easiest path forward.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public static final Schema SCHEMA = new Schema( | ||
required(1, "id", Types.IntegerType.get()), | ||
required(2, "data", Types.StringType.get()) | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not put the schema and spec in the parent class, DeletesReadTest
? The data it generates is for this schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
@@ -92,6 +72,22 @@ public void testEqualityDeletes() throws IOException { | |||
Assert.assertEquals("Table should contain expected rows", expected, actual); | |||
} | |||
|
|||
protected void generateTestData() throws IOException { | |||
this.records = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We prefer using Lists.newArrayList()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
public void writeTestDataFile() throws IOException { | ||
File tableDir = temp.newFolder(); | ||
tableDir.delete(); | ||
this.table = TestTables.create(tableDir, "test", SCHEMA, SPEC, 2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a better way to break down the class would be to have an abstract Table createTable(String name, Schema, Spec)
method. Then the table
and dataFile
fields don't need to be shared. I also don't think that there is a need to make records
public either.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense to me. Updated.
@@ -248,6 +258,26 @@ public void close() throws IOException { | |||
return iterable; | |||
} | |||
|
|||
@SuppressWarnings("unchecked") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deletes.filter(...)
needs this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, makes sense.
case GENERIC: | ||
DeleteFilter deletes = new GenericDeleteFilter(io, currentTask, tableSchema, readSchema); | ||
Schema requiredSchema = deletes.requiredSchema(); | ||
iter = deletes.filter(openTask(currentTask, requiredSchema)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not return deletes.filter(...)
here? That would remove the need for iter
and break
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
} | ||
} | ||
|
||
return parameters; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a simpler way to configure this? Normally, we build these using literals instead of a block of code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I just updated it.
return rowSet(table, "*"); | ||
} | ||
|
||
private static StructLikeSet rowSet(Table table, String... columns) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is what reads the rows from the table using Spark. Deleting this method and using the one in DeletesReadTest
makes this test suite use the exact same read path as the generics -- IcebergGenerics
.
You can probably make this method abstract and implement it in both classes to get around this. You'll also need to implement a read using the input format or Hive runner to test the Hive code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I missed this. I just added this back and also use input format to read records.
8faeac6
to
84310d9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!. The changes in MR to incorporate deletes are minimal and clean!
data/src/test/java/org/apache/iceberg/data/DeletesReadTest.java
Outdated
Show resolved
Hide resolved
data/src/test/java/org/apache/iceberg/data/GenericReaderDeletesTest.java
Outdated
Show resolved
Hide resolved
spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
Outdated
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
Outdated
Show resolved
Hide resolved
84310d9
to
e1102d7
Compare
data/src/test/java/org/apache/iceberg/data/DeletesReadTest.java
Outdated
Show resolved
Hide resolved
data/src/test/java/org/apache/iceberg/data/DeletesReadTest.java
Outdated
Show resolved
Hide resolved
data/src/test/java/org/apache/iceberg/data/DeletesReadTest.java
Outdated
Show resolved
Hide resolved
data/src/test/java/org/apache/iceberg/data/GenericReaderDeletesTest.java
Outdated
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
Outdated
Show resolved
Hide resolved
@@ -370,7 +370,7 @@ public void testCustomCatalog() throws IOException { | |||
testInputFormat.create(builder.conf()).validate(expectedRecords); | |||
} | |||
|
|||
private abstract static class TestInputFormat<T> { | |||
public abstract static class TestInputFormat<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why was this changed to public
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is because the testInputFormat
is type of TestIcebergInputFormats.TestInputFormat.Factory<Record>
which needs to access the TestInputFormat
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to minimize the changes to access modifiers in TestIcebergInputFormat
, the last place that needs the modifier of TestInputFormat
to be public is getRecords
method.
0a952f9
to
f5d382d
Compare
@@ -63,12 +82,20 @@ public void writeTestDataFile() throws IOException { | |||
records.add(record.copy("id", 122, "data", "g")); | |||
|
|||
this.dataFile = FileHelpers.writeDataFile(table, Files.localOutput(temp.newFile()), Row.of(0), records); | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: whitespace-only change.
|
||
@Before | ||
public void writeTestDataFile() throws IOException { | ||
public void prepareData() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: unnecessary method renames cause more changed lines than needed.
private List<Record> records = null; | ||
private DataFile dataFile = null; | ||
private DataFile dataFile; | ||
private List<Record> records; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changing the order of these two lines and dropping the default also causes unnecessary changes.
.bucket("data", 16) | ||
.build(); | ||
|
||
protected final String testTableName = "test"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be private. If it is needed by subclases, it should be passed into methods, not shared. I think this is only used by Spark, so it should be easy to fix.
3e6140b
to
f20d633
Compare
@chenjunjiedada, I went ahead and fixed the remaining issues and opened a PR against your branch. Could you please take a look? |
Merged. Thanks, @chenjunjiedada! |
This applies row-level delete files when reading for IcebergInputFormat. This also includes changes from #985.
This also refactors the deletes read unit tests to a separated base test class to avoid duplication.