Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: Add Orc value reader, writer implementations #1255

Merged
merged 9 commits into from
Aug 20, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix the broken unit tests.
  • Loading branch information
openinx committed Aug 20, 2020
commit b77a71327f58803c9c0d2575a1d83facf18ffff1
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public OrcValueWriter<?> primitive(Type.PrimitiveType iPrimitive, TypeDescriptio
case UUID:
return GenericOrcWriters.uuids();
case FIXED:
return GenericOrcWriters.fixed();
return GenericOrcWriters.bytes();
case BINARY:
return GenericOrcWriters.byteBuffers();
case DECIMAL:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public static OrcValueWriter<UUID> uuids() {
return UUIDWriter.INSTANCE;
}

public static OrcValueWriter<byte[]> fixed() {
public static OrcValueWriter<byte[]> bytes() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the FixedWriter class also be renamed to BytesWriter?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

return FixedWriter.INSTANCE;
}

Expand Down Expand Up @@ -337,7 +337,7 @@ public void nonNullWrite(int rowId, BigDecimal data, ColumnVector output) {
"Cannot write value as decimal(%s,%s), invalid precision: %s", precision, scale, data);

((DecimalColumnVector) output).vector[rowId]
.setFromLongAndScale(data.unscaledValue().longValueExact(), scale);
.setFromLongAndScale(data.unscaledValue().longValueExact(), data.scale());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about the TODO to check the scale matches the column's scale? As long as we're updating this, does it make sense to fix that, since we just had a decimal scale problem?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, we don't need to change this now, because this merged patch has fixed it. 6f96b36#diff-b1b07b15f036000a3f2bed76fdd9f961R334

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public OrcValueReader<?> primitive(Type.PrimitiveType iPrimitive, TypeDescriptio
case DOUBLE:
return OrcValueReaders.doubles();
case DATE:
return GenericOrcReaders.dates();
return FlinkOrcReaders.dates();
case TIME:
return FlinkOrcReaders.times();
case TIMESTAMP:
Expand All @@ -118,7 +118,7 @@ public OrcValueReader<?> primitive(Type.PrimitiveType iPrimitive, TypeDescriptio
return GenericOrcReaders.uuids();
case FIXED:
case BINARY:
return GenericOrcReaders.bytes();
return OrcValueReaders.bytes();
case DECIMAL:
Types.DecimalType decimalType = (Types.DecimalType) iPrimitive;
return FlinkOrcReaders.decimals(decimalType.precision(), decimalType.scale());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.flink.table.data.TimestampData;
import org.apache.iceberg.orc.OrcValueReader;
import org.apache.iceberg.orc.OrcValueReaders;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
Expand All @@ -49,35 +48,39 @@
import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
import org.apache.orc.storage.serde2.io.HiveDecimalWritable;

public class FlinkOrcReaders {
class FlinkOrcReaders {
private FlinkOrcReaders() {
}

public static OrcValueReader<StringData> strings() {
static OrcValueReader<StringData> strings() {
return StringReader.INSTANCE;
}

public static OrcValueReader<DecimalData> decimals(int precision, int scale) {
static OrcValueReader<Integer> dates() {
return DateReader.INSTANCE;
}

static OrcValueReader<DecimalData> decimals(int precision, int scale) {
if (precision <= 18) {
return new Decimal18Reader(precision, scale);
} else {
return new Decimal38Reader(precision, scale);
}
}

public static OrcValueReader<Integer> times() {
static OrcValueReader<Integer> times() {
return TimeReader.INSTANCE;
}

public static OrcValueReader<TimestampData> timestamps() {
static OrcValueReader<TimestampData> timestamps() {
return TimestampReader.INSTANCE;
}

public static OrcValueReader<TimestampData> timestampTzs() {
static OrcValueReader<TimestampData> timestampTzs() {
return TimestampTzReader.INSTANCE;
}

public static <T> OrcValueReader<ArrayData> array(OrcValueReader<T> elementReader) {
static <T> OrcValueReader<ArrayData> array(OrcValueReader<T> elementReader) {
return new ArrayReader<>(elementReader);
}

Expand All @@ -101,6 +104,15 @@ public StringData nonNullRead(ColumnVector vector, int row) {
}
}

private static class DateReader implements OrcValueReader<Integer> {
private static final DateReader INSTANCE = new DateReader();

@Override
public Integer nonNullRead(ColumnVector vector, int row) {
return (int) ((LongColumnVector) vector).vector[row];
}
}

private static class Decimal18Reader implements OrcValueReader<DecimalData> {
private final int precision;
private final int scale;
Expand All @@ -113,8 +125,6 @@ private static class Decimal18Reader implements OrcValueReader<DecimalData> {
@Override
public DecimalData nonNullRead(ColumnVector vector, int row) {
HiveDecimalWritable value = ((DecimalColumnVector) vector).vector[row];
Preconditions.checkArgument(precision == value.precision(), "Precision mismatched.");
Preconditions.checkArgument(scale == value.scale(), "Scale mismatched.");
return DecimalData.fromUnscaledLong(value.serialize64(value.scale()), value.precision(), value.scale());
}
}
Expand All @@ -140,8 +150,8 @@ private static class TimeReader implements OrcValueReader<Integer> {

@Override
public Integer nonNullRead(ColumnVector vector, int row) {
// Flink only support time mills, just erase micros.
long micros = ((LongColumnVector) vector).vector[row];
// Flink only support time mills, just erase micros.
return (int) (micros / 1000);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public OrcValueWriter<?> primitive(Type.PrimitiveType iPrimitive, LogicalType fl
case DOUBLE:
return GenericOrcWriters.doubles();
case DATE:
return GenericOrcWriters.dates();
return FlinkOrcWriters.dates();
case TIME:
return FlinkOrcWriters.times();
case TIMESTAMP:
Expand All @@ -114,9 +114,8 @@ public OrcValueWriter<?> primitive(Type.PrimitiveType iPrimitive, LogicalType fl
case UUID:
return GenericOrcWriters.uuids();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for UUID type, we flink should return GenericOrcWriters.bytes() instead of GenericOrcWriters.uuids(), will fix this in next path. The reader will also need to fix.

case FIXED:
return GenericOrcWriters.fixed();
case BINARY:
return GenericOrcWriters.byteBuffers();
return GenericOrcWriters.bytes();
case DECIMAL:
Types.DecimalType decimalType = (Types.DecimalType) iPrimitive;
return FlinkOrcWriters.decimals(decimalType.scale(), decimalType.precision());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@

package org.apache.iceberg.flink.data;

import java.math.BigDecimal;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.List;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.MapData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
Expand All @@ -51,6 +51,10 @@ static OrcValueWriter<?> strings() {
return StringWriter.INSTANCE;
}

static OrcValueWriter<?> dates() {
return DateWriter.INSTANCE;
}

static OrcValueWriter<?> times() {
return TimeWriter.INSTANCE;
}
Expand Down Expand Up @@ -99,6 +103,20 @@ public void nonNullWrite(int rowId, StringData data, ColumnVector output) {
}
}

private static class DateWriter implements OrcValueWriter<Integer> {
private static final DateWriter INSTANCE = new DateWriter();

@Override
public Class<?> getJavaClass() {
return Integer.class;
}

@Override
public void nonNullWrite(int rowId, Integer data, ColumnVector output) {
((LongColumnVector) output).vector[rowId] = data;
}
}

private static class TimeWriter implements OrcValueWriter<Integer> {
private static final TimeWriter INSTANCE = new TimeWriter();

Expand All @@ -108,8 +126,10 @@ public Class<?> getJavaClass() {
}

@Override
public void nonNullWrite(int rowId, Integer microSecond, ColumnVector output) {
((LongColumnVector) output).vector[rowId] = microSecond;
public void nonNullWrite(int rowId, Integer millis, ColumnVector output) {
// The time in flink is in millisecond, while the standard time in iceberg is microsecond.
// So we need to transform it to microsecond.
((LongColumnVector) output).vector[rowId] = millis * 1000;
}
}

Expand Down Expand Up @@ -152,7 +172,7 @@ public void nonNullWrite(int rowId, TimestampData data, ColumnVector output) {
}
}

private static class Decimal18Writer implements OrcValueWriter<BigDecimal> {
private static class Decimal18Writer implements OrcValueWriter<DecimalData> {
private final int scale;
private final int precision;

Expand All @@ -163,16 +183,16 @@ private static class Decimal18Writer implements OrcValueWriter<BigDecimal> {

@Override
public Class<?> getJavaClass() {
return BigDecimal.class;
return DecimalData.class;
}

@Override
public void nonNullWrite(int rowId, BigDecimal data, ColumnVector output) {
((DecimalColumnVector) output).vector[rowId].setFromLongAndScale(data.unscaledValue().longValueExact(), scale);
public void nonNullWrite(int rowId, DecimalData data, ColumnVector output) {
((DecimalColumnVector) output).vector[rowId].setFromLongAndScale(data.toUnscaledLong(), data.scale());
}
}

private static class Decimal38Writer implements OrcValueWriter<BigDecimal> {
private static class Decimal38Writer implements OrcValueWriter<DecimalData> {
private final int scale;
private final int precision;

Expand All @@ -183,12 +203,12 @@ private static class Decimal38Writer implements OrcValueWriter<BigDecimal> {

@Override
public Class<?> getJavaClass() {
return BigDecimal.class;
return DecimalData.class;
}

@Override
public void nonNullWrite(int rowId, BigDecimal data, ColumnVector output) {
((DecimalColumnVector) output).vector[rowId].set(HiveDecimal.create(data, false));
public void nonNullWrite(int rowId, DecimalData data, ColumnVector output) {
((DecimalColumnVector) output).vector[rowId].set(HiveDecimal.create(data.toBigDecimal(), false));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ private static Iterable<RowData> generateRowData(Schema schema, int numRecords,
Supplier<RandomRowGenerator> supplier) {
DataStructureConverter<Object, Object> converter =
DataStructureConverters.getConverter(TypeConversions.fromLogicalToDataType(FlinkSchemaUtil.convert(schema)));
converter.open(RandomData.class.getClassLoader());

return () -> new Iterator<RowData>() {
private final RandomRowGenerator generator = supplier.get();
private int count = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,16 @@
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.compress.utils.Lists;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.iceberg.Files;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.DataTest;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.orc.GenericOrcWriter;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
Expand All @@ -43,7 +48,25 @@ public class TestFlinkOrcReaderWriter extends DataTest {

@Override
protected void writeAndValidate(Schema schema) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this validates records read by the reader against records written by the reader and subsequently read by the reader. I think it should validate the reader and writer separately. I think it should have to parts:

  1. Write using generics, read using Flink, and use assertEquals(schema, genericRow, flinkRow)
  2. Write using generics, read with Flink, write with Flink, read with generics and use assertEquals(genericRow, endGenericRow).

That way, we're always comparing results against the generics that were originally generated. I think we already have the assertEquals code to do it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You testing method is correct, but we don't have assertEquals(schema, genericRow, flinkRow) before Junjie's parquet readers & writers patch get in. So I changed to another way to verify the data:

  1. generate List<records> by random generater;
  2. convert the records to RowData list;
  3. writer records from step1 to orc file, and reading them into RowData list, and compare with RowData from step2;
  4. write RowData from step2 into orc file, and reading them into records, and compare with Records from step1.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@openinx , Sorry to block you so long. Now it is merged. You might want to take a look.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since your TestHelpers got merged, then I don't have to write the AssertHelpers now, could reuse your work. Thanks for the work, I've updated the patch.

Iterable<RowData> iterable = RandomData.generateRowData(schema, NUM_RECORDS, 1990L);
List<Record> records = RandomGenericData.generate(schema, NUM_RECORDS, 1990L);

File recordsFile = temp.newFile();
Assert.assertTrue("Delete should succeed", recordsFile.delete());

try (FileAppender<Record> writer = ORC.write(Files.localOutput(recordsFile))
.schema(schema)
.createWriterFunc(GenericOrcWriter::buildWriter)
.build()) {
writer.addAll(records);
}

List<RowData> rowDataList = Lists.newArrayList();
try (CloseableIterable<RowData> reader = ORC.read(Files.localInput(recordsFile))
.project(schema)
.createReaderFunc(type -> FlinkOrcReader.buildReader(schema, type))
.build()) {
reader.forEach(rowDataList::add);
}

File testFile = temp.newFile();
Assert.assertTrue("Delete should succeed", testFile.delete());
Expand All @@ -53,20 +76,21 @@ protected void writeAndValidate(Schema schema) throws IOException {
.schema(schema)
.createWriterFunc((iSchema, typeDesc) -> FlinkOrcWriter.buildWriter(rowType, iSchema))
.build()) {
writer.addAll(iterable);
writer.addAll(rowDataList);
}

try (CloseableIterable<RowData> reader = ORC.read(Files.localInput(testFile))
.project(schema)
.createReaderFunc(type -> FlinkOrcReader.buildReader(schema, type))
.build()) {
Iterator<RowData> expected = iterable.iterator();
Iterator<RowData> expected = rowDataList.iterator();
Iterator<RowData> rows = reader.iterator();
for (int i = 0; i < NUM_RECORDS; i += 1) {
Assert.assertTrue("Should have expected number of rows", rows.hasNext());
Assert.assertEquals(expected.next(), rows.next());
}
Assert.assertFalse("Should not have extra rows", rows.hasNext());
Assert.assertFalse("Should not have extra rows", expected.hasNext());
}
}
}