From 1720f5c51d3e22a9845e96db22668cec37407528 Mon Sep 17 00:00:00 2001 From: Walaa Eldin Moustafa Date: Tue, 20 Aug 2024 19:29:18 -0700 Subject: [PATCH] Refactor toGenericRecord --- .../iceberg/avro/GenericAvroReader.java | 42 +---------- .../avro/IcebergDataToGenericRecord.java | 74 +++++++++++++++++++ .../iceberg/avro/TestReadDefaultValues.java | 41 +--------- 3 files changed, 79 insertions(+), 78 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/avro/IcebergDataToGenericRecord.java diff --git a/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java b/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java index dabb20d697b0..2eb658158248 100644 --- a/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java +++ b/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java @@ -25,13 +25,11 @@ import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; -import org.apache.avro.generic.GenericData; import org.apache.avro.generic.IndexedRecord; import org.apache.avro.io.DatumReader; import org.apache.avro.io.Decoder; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.common.DynClasses; -import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -143,10 +141,13 @@ public ValueReader record(Type partner, Schema record, List> f if (constant != null) { readPlan.add(Pair.of(pos, ValueReaders.constant(constant))); } else if (field.initialDefault() != null) { + readPlan.add( Pair.of( pos, - ValueReaders.constant(toGenericRecord(field.type(), field.initialDefault())))); + ValueReaders.constant( + IcebergDataToGenericRecord.toGenericRecord( + field.type(), field.initialDefault())))); } else if (fieldId == MetadataColumns.IS_DELETED.fieldId()) { readPlan.add(Pair.of(pos, ValueReaders.constant(false))); } else if (fieldId == MetadataColumns.ROW_POSITION.fieldId()) { @@ -162,41 +163,6 @@ public ValueReader record(Type partner, Schema record, List> f return recordReader(readPlan, avroSchemas.get(partner), record.getFullName()); } - Object toGenericRecord(Type type, Object data) { - // Recursively convert data to GenericRecord if type is a StructType. - // TODO: Rewrite this as a visitor. - if (type instanceof Types.StructType) { - Types.StructType structType = (Types.StructType) type; - GenericData.Record genericRecord = new GenericData.Record(AvroSchemaUtil.convert(type)); - int index = 0; - for (Types.NestedField field : structType.fields()) { - genericRecord.put( - field.name(), toGenericRecord(field.type(), ((GenericRecord) data).get(index))); - index++; - } - return genericRecord; - } else if (type instanceof Types.MapType) { - Types.MapType mapType = (Types.MapType) type; - Map genericMap = - Maps.newHashMapWithExpectedSize(((Map) data).size()); - for (Map.Entry entry : ((Map) data).entrySet()) { - genericMap.put( - toGenericRecord(mapType.keyType(), entry.getKey()), - toGenericRecord(mapType.valueType(), entry.getValue())); - } - return genericMap; - } else if (type instanceof Types.ListType) { - Types.ListType listType = (Types.ListType) type; - List genericList = Lists.newArrayListWithExpectedSize(((List) data).size()); - for (Object element : (List) data) { - genericList.add(toGenericRecord(listType.elementType(), element)); - } - return genericList; - } else { - return data; - } - } - @SuppressWarnings("unchecked") private ValueReader recordReader( List>> readPlan, Schema avroSchema, String recordName) { diff --git a/core/src/main/java/org/apache/iceberg/avro/IcebergDataToGenericRecord.java b/core/src/main/java/org/apache/iceberg/avro/IcebergDataToGenericRecord.java new file mode 100644 index 000000000000..eda97879183b --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/avro/IcebergDataToGenericRecord.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.avro; + +import java.util.List; +import java.util.Map; +import org.apache.avro.generic.GenericData; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; + +/** Convert Iceberg data to Avro GenericRecord. */ +public class IcebergDataToGenericRecord { + private IcebergDataToGenericRecord() {} + + /** + * Convert Iceberg data to Avro GenericRecord. + * + * @param type Iceberg type + * @param data Iceberg data + * @return Avro GenericRecord + */ + public static Object toGenericRecord(Type type, Object data) { + switch (type.typeId()) { + case STRUCT: + Types.StructType structType = (Types.StructType) type; + GenericData.Record genericRecord = new GenericData.Record(AvroSchemaUtil.convert(type)); + int index = 0; + for (Types.NestedField field : structType.fields()) { + genericRecord.put( + field.name(), toGenericRecord(field.type(), ((GenericRecord) data).get(index))); + index++; + } + return genericRecord; + case LIST: + Types.ListType listType = (Types.ListType) type; + List genericList = Lists.newArrayListWithExpectedSize(((List) data).size()); + for (Object element : (List) data) { + genericList.add(toGenericRecord(listType.elementType(), element)); + } + return genericList; + case MAP: + Types.MapType mapType = (Types.MapType) type; + Map genericMap = + Maps.newHashMapWithExpectedSize(((Map) data).size()); + for (Map.Entry entry : ((Map) data).entrySet()) { + genericMap.put( + toGenericRecord(mapType.keyType(), entry.getKey()), + toGenericRecord(mapType.valueType(), entry.getValue())); + } + return genericMap; + default: + return data; + } + } +} diff --git a/core/src/test/java/org/apache/iceberg/avro/TestReadDefaultValues.java b/core/src/test/java/org/apache/iceberg/avro/TestReadDefaultValues.java index f06d04b8925f..50be205f654e 100644 --- a/core/src/test/java/org/apache/iceberg/avro/TestReadDefaultValues.java +++ b/core/src/test/java/org/apache/iceberg/avro/TestReadDefaultValues.java @@ -24,15 +24,12 @@ import java.io.File; import java.io.IOException; import java.util.List; -import java.util.Map; import org.apache.avro.generic.GenericData.Record; import org.apache.iceberg.Files; import org.apache.iceberg.Schema; import org.apache.iceberg.SingleValueParser; -import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.junit.Assert; @@ -127,7 +124,7 @@ public void testDefaultValueApplied() throws IOException { Record expectedRecord = new Record(AvroSchemaUtil.convert(readerSchema.asStruct())); expectedRecord.put(0, 1); - expectedRecord.put(1, toGenericRecord(type, defaultValue)); + expectedRecord.put(1, IcebergDataToGenericRecord.toGenericRecord(type, defaultValue)); File testFile = temp.newFile(); Assert.assertTrue("Delete should succeed", testFile.delete()); @@ -198,40 +195,4 @@ public void testDefaultValueNotApplied() throws IOException { AvroTestHelpers.assertEquals(readerSchema.asStruct(), expectedRecord, rows.get(0)); } } - - // TODO: Merge the conversion mechanism with the end state of - // ValueReaders.ConstantReader.toGenericRecord(). - private Object toGenericRecord(Type type, Object data) { - // Recursively convert data to GenericRecord if type is a StructType. - if (type instanceof Types.StructType) { - Types.StructType structType = (Types.StructType) type; - Record genericRecord = new Record(AvroSchemaUtil.convert(type)); - int index = 0; - for (Types.NestedField field : structType.fields()) { - genericRecord.put( - field.name(), toGenericRecord(field.type(), ((GenericRecord) data).get(index))); - index++; - } - return genericRecord; - } else if (type instanceof Types.MapType) { - Types.MapType mapType = (Types.MapType) type; - Map genericMap = - Maps.newHashMapWithExpectedSize(((Map) data).size()); - for (Map.Entry entry : ((Map) data).entrySet()) { - genericMap.put( - toGenericRecord(mapType.keyType(), entry.getKey()), - toGenericRecord(mapType.valueType(), entry.getValue())); - } - return genericMap; - } else if (type instanceof Types.ListType) { - Types.ListType listType = (Types.ListType) type; - List genericList = Lists.newArrayListWithExpectedSize(((List) data).size()); - for (Object element : (List) data) { - genericList.add(toGenericRecord(listType.elementType(), element)); - } - return genericList; - } else { - return data; - } - } }