Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: Add Orc value reader, writer implementations #1255

Merged
merged 9 commits into from
Aug 20, 2020
Merged

Flink: Add Orc value reader, writer implementations #1255

merged 9 commits into from
Aug 20, 2020

Conversation

openinx
Copy link
Member

@openinx openinx commented Jul 27, 2020

No description provided.

@openinx
Copy link
Member Author

openinx commented Jul 29, 2020

I've fixed the broken unit test in the lates patch. It can be reviewed now and I will remove the WIP tag.. let's wait the travis test result.

@openinx openinx changed the title [WIP] Flink: Add Orc value reader, writer implementations Flink: Add Orc value reader, writer implementations Jul 29, 2020
@openinx
Copy link
Member Author

openinx commented Jul 30, 2020

Rebase to resolve the conflicts with #1197

return visit(flinkType, schema.asStruct(), visitor);
}

private static <T> T visit(LogicalType flinkType, Type iType, FlinkOrcSchemaVisitor<T> visitor) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems the flinkType here was just used for exception msg at the end of primitive method right ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logicalType is mainly used for getting fields from list/map/struct. you could see ListWriter, MapWriter, StructWriter. we will generate a elementGetter for ListWriter and use it to read the element of a list.

case STRING:
return FlinkOrcWriters.strings();
case UUID:
return GenericOrcWriters.uuids();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for UUID type, we flink should return GenericOrcWriters.bytes() instead of GenericOrcWriters.uuids(), will fix this in next path. The reader will also need to fix.

@openinx
Copy link
Member Author

openinx commented Aug 11, 2020

@rdsr @rdblue any other concerns ? Thanks.

import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;

abstract class FlinkSchemaVisitor<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I understand why u needed to build a FlinkSchemaVisitor. Unlike Spark, it seems in Flink, there's no common super interface for internal datatypes [list, map, struct], like SpecializedGetters for Spark. So we had to know upfront what is the type to pass to struct, map and list writers, whereas in Spark we can simply pass the parent SpecializedGetters object and get the right data type from within in.

Copy link
Contributor

@rdsr rdsr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The high level approach look fines to me.

@openinx
Copy link
Member Author

openinx commented Aug 18, 2020

Ping @rdblue , could this be merged into master branch now ?

@@ -87,7 +87,7 @@ private GenericOrcWriters() {
return UUIDWriter.INSTANCE;
}

public static OrcValueWriter<byte[]> fixed() {
public static OrcValueWriter<byte[]> bytes() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the FixedWriter class also be renamed to BytesWriter?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

@@ -326,7 +326,7 @@ public void nonNullWrite(int rowId, LocalDateTime data, ColumnVector output) {
public void nonNullWrite(int rowId, BigDecimal data, ColumnVector output) {
// TODO: validate precision and scale from schema
((DecimalColumnVector) output).vector[rowId]
.setFromLongAndScale(data.unscaledValue().longValueExact(), scale);
.setFromLongAndScale(data.unscaledValue().longValueExact(), data.scale());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about the TODO to check the scale matches the column's scale? As long as we're updating this, does it make sense to fix that, since we just had a decimal scale problem?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, we don't need to change this now, because this merged patch has fixed it. 6f96b36#diff-b1b07b15f036000a3f2bed76fdd9f961R334

ListColumnVector listVector = (ListColumnVector) vector;
int offset = (int) listVector.offsets[row];
int length = (int) listVector.lengths[row];
List<T> elements = Lists.newArrayListWithExpectedSize(length);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this list be reused? Allocating a new ArrayList each time could lead to poor performance.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One way to reuse the ArrayList is: make it as a ThreadLocal<List>, then each thread will share the same List instance. when reading ArrayData, we get the ThreadLocal list and clear it (the array list's space won't shrink and only free the references to elements). Then read values into the list. One thing need to concern is the size of list: if we read an ArrayData with many elements, then the ThreadLocal list may expand to a huge list too, that would waste much memory. I did not get a good idea to handle such case, I also see other orc readers are allocating the list, maybe we could handle this in a separate issue.

public TemporaryFolder temp = new TemporaryFolder();

@Override
protected void writeAndValidate(Schema schema) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this validates records read by the reader against records written by the reader and subsequently read by the reader. I think it should validate the reader and writer separately. I think it should have to parts:

  1. Write using generics, read using Flink, and use assertEquals(schema, genericRow, flinkRow)
  2. Write using generics, read with Flink, write with Flink, read with generics and use assertEquals(genericRow, endGenericRow).

That way, we're always comparing results against the generics that were originally generated. I think we already have the assertEquals code to do it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You testing method is correct, but we don't have assertEquals(schema, genericRow, flinkRow) before Junjie's parquet readers & writers patch get in. So I changed to another way to verify the data:

  1. generate List<records> by random generater;
  2. convert the records to RowData list;
  3. writer records from step1 to orc file, and reading them into RowData list, and compare with RowData from step2;
  4. write RowData from step2 into orc file, and reading them into records, and compare with Records from step1.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@openinx , Sorry to block you so long. Now it is merged. You might want to take a look.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since your TestHelpers got merged, then I don't have to write the AssertHelpers now, could reuse your work. Thanks for the work, I've updated the patch.

@openinx
Copy link
Member Author

openinx commented Aug 19, 2020

I've rebased the patch and rewrite the unit tests by asserting readed records/RowData with the generated records/RowData. Also I add the ORC to the parameterized unit tests. Let's see the result from travis. If all tests pass, @rdblue you may want to take a final look.

@rdblue
Copy link
Contributor

rdblue commented Aug 19, 2020

Looks like tests are failing because ORC wasn't added to the appender factory. Should be an easy fix?

@openinx
Copy link
Member Author

openinx commented Aug 20, 2020

Sure, let's see travis result again.

@@ -91,6 +91,12 @@ private WriteBuilder() {
case BOOLEAN:
return GenericOrcWriters.booleans();
case INTEGER:
switch (flinkPrimitive.getTypeRoot()) {
case TINYINT:
return GenericOrcWriters.bytes();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixed the broken unit tests in flink: 2ec4ce6#diff-6820fc22b4e5cbfa4a1c029bf5c8c789L255.

we may need to add similar UT in spark so that we could write the tinyint and smallint to spark correctly, I will take a look.

@rdblue rdblue merged commit 311f2a1 into apache:master Aug 20, 2020
@rdblue
Copy link
Contributor

rdblue commented Aug 20, 2020

Thanks @openinx! I merged this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants