diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java index a5f2bb738960..a23fb2d6ee36 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java @@ -89,6 +89,7 @@ public ParquetValueReader message( } @Override + @SuppressWarnings("checkstyle:CyclomaticComplexity") public ParquetValueReader struct( Types.StructType expected, GroupType struct, List> fieldReaders) { // match the expected struct's order @@ -120,6 +121,7 @@ public ParquetValueReader struct( int defaultMaxDefinitionLevel = type.getMaxDefinitionLevel(currentPath()); for (Types.NestedField field : expectedFields) { int id = field.fieldId(); + ParquetValueReader reader = readersById.get(id); if (idToConstant.containsKey(id)) { // containsKey is used because the constant may be null int fieldMaxDefinitionLevel = @@ -133,15 +135,21 @@ public ParquetValueReader struct( } else if (id == MetadataColumns.IS_DELETED.fieldId()) { reorderedFields.add(ParquetValueReaders.constant(false)); types.add(null); + } else if (reader != null) { + reorderedFields.add(reader); + types.add(typesById.get(id)); + } else if (field.initialDefault() != null) { + reorderedFields.add( + ParquetValueReaders.constant( + RowDataUtil.convertConstant(field.type(), field.initialDefault()), + maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel))); + types.add(typesById.get(id)); + } else if (field.isOptional()) { + reorderedFields.add(ParquetValueReaders.nulls()); + types.add(null); } else { - ParquetValueReader reader = readersById.get(id); - if (reader != null) { - reorderedFields.add(reader); - types.add(typesById.get(id)); - } else { - reorderedFields.add(ParquetValueReaders.nulls()); - types.add(null); - } + throw new IllegalArgumentException( + String.format("Missing required field: %s", field.name())); } } diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java index 4cfb24f62921..6b1af0202c34 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java @@ -59,6 +59,11 @@ public class TestFlinkParquetReader extends DataTest { private static final int NUM_RECORDS = 100; + @Override + protected boolean supportsDefaultValues() { + return true; + } + @Test public void testBuildReader() { MessageType fileSchema = @@ -199,13 +204,14 @@ public void testTwoLevelList() throws IOException { } } - private void writeAndValidate(Iterable iterable, Schema schema) throws IOException { + private void writeAndValidate( + Iterable iterable, Schema writeSchema, Schema expectedSchema) throws IOException { File testFile = File.createTempFile("junit", null, temp.toFile()); assertThat(testFile.delete()).isTrue(); try (FileAppender writer = Parquet.write(Files.localOutput(testFile)) - .schema(schema) + .schema(writeSchema) .createWriterFunc(GenericParquetWriter::buildWriter) .build()) { writer.addAll(iterable); @@ -213,15 +219,15 @@ private void writeAndValidate(Iterable iterable, Schema schema) throws I try (CloseableIterable reader = Parquet.read(Files.localInput(testFile)) - .project(schema) - .createReaderFunc(type -> FlinkParquetReaders.buildReader(schema, type)) + .project(expectedSchema) + .createReaderFunc(type -> FlinkParquetReaders.buildReader(expectedSchema, type)) .build()) { Iterator expected = iterable.iterator(); Iterator rows = reader.iterator(); - LogicalType rowType = FlinkSchemaUtil.convert(schema); + LogicalType rowType = FlinkSchemaUtil.convert(writeSchema); for (int i = 0; i < NUM_RECORDS; i += 1) { assertThat(rows).hasNext(); - TestHelpers.assertRowData(schema.asStruct(), rowType, expected.next(), rows.next()); + TestHelpers.assertRowData(writeSchema.asStruct(), rowType, expected.next(), rows.next()); } assertThat(rows).isExhausted(); } @@ -229,11 +235,19 @@ private void writeAndValidate(Iterable iterable, Schema schema) throws I @Override protected void writeAndValidate(Schema schema) throws IOException { - writeAndValidate(RandomGenericData.generate(schema, NUM_RECORDS, 19981), schema); + writeAndValidate(RandomGenericData.generate(schema, NUM_RECORDS, 19981), schema, schema); writeAndValidate( - RandomGenericData.generateDictionaryEncodableRecords(schema, NUM_RECORDS, 21124), schema); + RandomGenericData.generateDictionaryEncodableRecords(schema, NUM_RECORDS, 21124), + schema, + schema); writeAndValidate( RandomGenericData.generateFallbackRecords(schema, NUM_RECORDS, 21124, NUM_RECORDS / 20), + schema, schema); } + + @Override + protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throws IOException { + writeAndValidate(RandomGenericData.generate(writeSchema, 100, 0L), writeSchema, expectedSchema); + } } diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java index a5f2bb738960..a23fb2d6ee36 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java @@ -89,6 +89,7 @@ public ParquetValueReader message( } @Override + @SuppressWarnings("checkstyle:CyclomaticComplexity") public ParquetValueReader struct( Types.StructType expected, GroupType struct, List> fieldReaders) { // match the expected struct's order @@ -120,6 +121,7 @@ public ParquetValueReader struct( int defaultMaxDefinitionLevel = type.getMaxDefinitionLevel(currentPath()); for (Types.NestedField field : expectedFields) { int id = field.fieldId(); + ParquetValueReader reader = readersById.get(id); if (idToConstant.containsKey(id)) { // containsKey is used because the constant may be null int fieldMaxDefinitionLevel = @@ -133,15 +135,21 @@ public ParquetValueReader struct( } else if (id == MetadataColumns.IS_DELETED.fieldId()) { reorderedFields.add(ParquetValueReaders.constant(false)); types.add(null); + } else if (reader != null) { + reorderedFields.add(reader); + types.add(typesById.get(id)); + } else if (field.initialDefault() != null) { + reorderedFields.add( + ParquetValueReaders.constant( + RowDataUtil.convertConstant(field.type(), field.initialDefault()), + maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel))); + types.add(typesById.get(id)); + } else if (field.isOptional()) { + reorderedFields.add(ParquetValueReaders.nulls()); + types.add(null); } else { - ParquetValueReader reader = readersById.get(id); - if (reader != null) { - reorderedFields.add(reader); - types.add(typesById.get(id)); - } else { - reorderedFields.add(ParquetValueReaders.nulls()); - types.add(null); - } + throw new IllegalArgumentException( + String.format("Missing required field: %s", field.name())); } } diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java index 4cfb24f62921..6b1af0202c34 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java @@ -59,6 +59,11 @@ public class TestFlinkParquetReader extends DataTest { private static final int NUM_RECORDS = 100; + @Override + protected boolean supportsDefaultValues() { + return true; + } + @Test public void testBuildReader() { MessageType fileSchema = @@ -199,13 +204,14 @@ public void testTwoLevelList() throws IOException { } } - private void writeAndValidate(Iterable iterable, Schema schema) throws IOException { + private void writeAndValidate( + Iterable iterable, Schema writeSchema, Schema expectedSchema) throws IOException { File testFile = File.createTempFile("junit", null, temp.toFile()); assertThat(testFile.delete()).isTrue(); try (FileAppender writer = Parquet.write(Files.localOutput(testFile)) - .schema(schema) + .schema(writeSchema) .createWriterFunc(GenericParquetWriter::buildWriter) .build()) { writer.addAll(iterable); @@ -213,15 +219,15 @@ private void writeAndValidate(Iterable iterable, Schema schema) throws I try (CloseableIterable reader = Parquet.read(Files.localInput(testFile)) - .project(schema) - .createReaderFunc(type -> FlinkParquetReaders.buildReader(schema, type)) + .project(expectedSchema) + .createReaderFunc(type -> FlinkParquetReaders.buildReader(expectedSchema, type)) .build()) { Iterator expected = iterable.iterator(); Iterator rows = reader.iterator(); - LogicalType rowType = FlinkSchemaUtil.convert(schema); + LogicalType rowType = FlinkSchemaUtil.convert(writeSchema); for (int i = 0; i < NUM_RECORDS; i += 1) { assertThat(rows).hasNext(); - TestHelpers.assertRowData(schema.asStruct(), rowType, expected.next(), rows.next()); + TestHelpers.assertRowData(writeSchema.asStruct(), rowType, expected.next(), rows.next()); } assertThat(rows).isExhausted(); } @@ -229,11 +235,19 @@ private void writeAndValidate(Iterable iterable, Schema schema) throws I @Override protected void writeAndValidate(Schema schema) throws IOException { - writeAndValidate(RandomGenericData.generate(schema, NUM_RECORDS, 19981), schema); + writeAndValidate(RandomGenericData.generate(schema, NUM_RECORDS, 19981), schema, schema); writeAndValidate( - RandomGenericData.generateDictionaryEncodableRecords(schema, NUM_RECORDS, 21124), schema); + RandomGenericData.generateDictionaryEncodableRecords(schema, NUM_RECORDS, 21124), + schema, + schema); writeAndValidate( RandomGenericData.generateFallbackRecords(schema, NUM_RECORDS, 21124, NUM_RECORDS / 20), + schema, schema); } + + @Override + protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throws IOException { + writeAndValidate(RandomGenericData.generate(writeSchema, 100, 0L), writeSchema, expectedSchema); + } }