-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flink: Add Orc value reader, writer implementations #1255
Conversation
I've fixed the broken unit test in the lates patch. It can be reviewed now and I will remove the |
Rebase to resolve the conflicts with #1197 |
return visit(flinkType, schema.asStruct(), visitor); | ||
} | ||
|
||
private static <T> T visit(LogicalType flinkType, Type iType, FlinkOrcSchemaVisitor<T> visitor) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems the flinkType here was just used for exception msg at the end of primitive method right ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logicalType
is mainly used for getting fields from list/map/struct. you could see ListWriter, MapWriter, StructWriter. we will generate a elementGetter
for ListWriter and use it to read the element of a list.
flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java
Outdated
Show resolved
Hide resolved
flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcSchemaVisitor.java
Outdated
Show resolved
Hide resolved
case STRING: | ||
return FlinkOrcWriters.strings(); | ||
case UUID: | ||
return GenericOrcWriters.uuids(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for UUID type, we flink should return GenericOrcWriters.bytes()
instead of GenericOrcWriters.uuids()
, will fix this in next path. The reader will also need to fix.
import org.apache.iceberg.types.Type; | ||
import org.apache.iceberg.types.Types; | ||
|
||
abstract class FlinkSchemaVisitor<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I understand why u needed to build a FlinkSchemaVisitor. Unlike Spark, it seems in Flink, there's no common super interface for internal datatypes [list, map, struct], like SpecializedGetters
for Spark. So we had to know upfront what is the type to pass to struct, map and list writers, whereas in Spark we can simply pass the parent SpecializedGetters
object and get the right data type from within in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The high level approach look fines to me.
Ping @rdblue , could this be merged into master branch now ? |
@@ -87,7 +87,7 @@ private GenericOrcWriters() { | |||
return UUIDWriter.INSTANCE; | |||
} | |||
|
|||
public static OrcValueWriter<byte[]> fixed() { | |||
public static OrcValueWriter<byte[]> bytes() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the FixedWriter
class also be renamed to BytesWriter
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good.
@@ -326,7 +326,7 @@ public void nonNullWrite(int rowId, LocalDateTime data, ColumnVector output) { | |||
public void nonNullWrite(int rowId, BigDecimal data, ColumnVector output) { | |||
// TODO: validate precision and scale from schema | |||
((DecimalColumnVector) output).vector[rowId] | |||
.setFromLongAndScale(data.unscaledValue().longValueExact(), scale); | |||
.setFromLongAndScale(data.unscaledValue().longValueExact(), data.scale()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about the TODO to check the scale matches the column's scale? As long as we're updating this, does it make sense to fix that, since we just had a decimal scale problem?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, we don't need to change this now, because this merged patch has fixed it. 6f96b36#diff-b1b07b15f036000a3f2bed76fdd9f961R334
ListColumnVector listVector = (ListColumnVector) vector; | ||
int offset = (int) listVector.offsets[row]; | ||
int length = (int) listVector.lengths[row]; | ||
List<T> elements = Lists.newArrayListWithExpectedSize(length); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this list be reused? Allocating a new ArrayList each time could lead to poor performance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One way to reuse the ArrayList is: make it as a ThreadLocal<List>, then each thread will share the same List instance. when reading ArrayData, we get the ThreadLocal list and clear it (the array list's space won't shrink and only free the references to elements). Then read values into the list. One thing need to concern is the size of list: if we read an ArrayData with many elements, then the ThreadLocal list may expand to a huge list too, that would waste much memory. I did not get a good idea to handle such case, I also see other orc readers are allocating the list, maybe we could handle this in a separate issue.
public TemporaryFolder temp = new TemporaryFolder(); | ||
|
||
@Override | ||
protected void writeAndValidate(Schema schema) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this validates records read by the reader against records written by the reader and subsequently read by the reader. I think it should validate the reader and writer separately. I think it should have to parts:
- Write using generics, read using Flink, and use assertEquals(schema, genericRow, flinkRow)
- Write using generics, read with Flink, write with Flink, read with generics and use assertEquals(genericRow, endGenericRow).
That way, we're always comparing results against the generics that were originally generated. I think we already have the assertEquals
code to do it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You testing method is correct, but we don't have assertEquals(schema, genericRow, flinkRow)
before Junjie's parquet readers & writers patch get in. So I changed to another way to verify the data:
- generate
List<records>
by random generater; - convert the records to RowData list;
- writer records from step1 to orc file, and reading them into RowData list, and compare with RowData from step2;
- write RowData from step2 into orc file, and reading them into records, and compare with Records from step1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@openinx , Sorry to block you so long. Now it is merged. You might want to take a look.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since your TestHelpers
got merged, then I don't have to write the AssertHelpers
now, could reuse your work. Thanks for the work, I've updated the patch.
I've rebased the patch and rewrite the unit tests by asserting readed records/RowData with the generated records/RowData. Also I add the ORC to the parameterized unit tests. Let's see the result from travis. If all tests pass, @rdblue you may want to take a final look. |
Looks like tests are failing because ORC wasn't added to the appender factory. Should be an easy fix? |
Sure, let's see travis result again. |
@@ -91,6 +91,12 @@ private WriteBuilder() { | |||
case BOOLEAN: | |||
return GenericOrcWriters.booleans(); | |||
case INTEGER: | |||
switch (flinkPrimitive.getTypeRoot()) { | |||
case TINYINT: | |||
return GenericOrcWriters.bytes(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This fixed the broken unit tests in flink: 2ec4ce6#diff-6820fc22b4e5cbfa4a1c029bf5c8c789L255.
we may need to add similar UT in spark so that we could write the tinyint and smallint to spark correctly, I will take a look.
Thanks @openinx! I merged this. |
No description provided.