Skip to content

Commit

Permalink
Refactor toGenericRecord
Browse files Browse the repository at this point in the history
  • Loading branch information
Walaa Eldin Moustafa committed Aug 21, 2024
1 parent eaaa1b1 commit 1720f5c
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 78 deletions.
42 changes: 4 additions & 38 deletions core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,11 @@
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.common.DynClasses;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
Expand Down Expand Up @@ -143,10 +141,13 @@ public ValueReader<?> record(Type partner, Schema record, List<ValueReader<?>> f
if (constant != null) {
readPlan.add(Pair.of(pos, ValueReaders.constant(constant)));
} else if (field.initialDefault() != null) {

readPlan.add(
Pair.of(
pos,
ValueReaders.constant(toGenericRecord(field.type(), field.initialDefault()))));
ValueReaders.constant(
IcebergDataToGenericRecord.toGenericRecord(
field.type(), field.initialDefault()))));
} else if (fieldId == MetadataColumns.IS_DELETED.fieldId()) {
readPlan.add(Pair.of(pos, ValueReaders.constant(false)));
} else if (fieldId == MetadataColumns.ROW_POSITION.fieldId()) {
Expand All @@ -162,41 +163,6 @@ public ValueReader<?> record(Type partner, Schema record, List<ValueReader<?>> f
return recordReader(readPlan, avroSchemas.get(partner), record.getFullName());
}

Object toGenericRecord(Type type, Object data) {
// Recursively convert data to GenericRecord if type is a StructType.
// TODO: Rewrite this as a visitor.
if (type instanceof Types.StructType) {
Types.StructType structType = (Types.StructType) type;
GenericData.Record genericRecord = new GenericData.Record(AvroSchemaUtil.convert(type));
int index = 0;
for (Types.NestedField field : structType.fields()) {
genericRecord.put(
field.name(), toGenericRecord(field.type(), ((GenericRecord) data).get(index)));
index++;
}
return genericRecord;
} else if (type instanceof Types.MapType) {
Types.MapType mapType = (Types.MapType) type;
Map<Object, Object> genericMap =
Maps.newHashMapWithExpectedSize(((Map<Object, Object>) data).size());
for (Map.Entry<Object, Object> entry : ((Map<Object, Object>) data).entrySet()) {
genericMap.put(
toGenericRecord(mapType.keyType(), entry.getKey()),
toGenericRecord(mapType.valueType(), entry.getValue()));
}
return genericMap;
} else if (type instanceof Types.ListType) {
Types.ListType listType = (Types.ListType) type;
List<Object> genericList = Lists.newArrayListWithExpectedSize(((List<Object>) data).size());
for (Object element : (List<Object>) data) {
genericList.add(toGenericRecord(listType.elementType(), element));
}
return genericList;
} else {
return data;
}
}

@SuppressWarnings("unchecked")
private ValueReader<?> recordReader(
List<Pair<Integer, ValueReader<?>>> readPlan, Schema avroSchema, String recordName) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.avro;

import java.util.List;
import java.util.Map;
import org.apache.avro.generic.GenericData;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;

/** Convert Iceberg data to Avro GenericRecord. */
public class IcebergDataToGenericRecord {
private IcebergDataToGenericRecord() {}

/**
* Convert Iceberg data to Avro GenericRecord.
*
* @param type Iceberg type
* @param data Iceberg data
* @return Avro GenericRecord
*/
public static Object toGenericRecord(Type type, Object data) {
switch (type.typeId()) {
case STRUCT:
Types.StructType structType = (Types.StructType) type;
GenericData.Record genericRecord = new GenericData.Record(AvroSchemaUtil.convert(type));
int index = 0;
for (Types.NestedField field : structType.fields()) {
genericRecord.put(
field.name(), toGenericRecord(field.type(), ((GenericRecord) data).get(index)));
index++;
}
return genericRecord;
case LIST:
Types.ListType listType = (Types.ListType) type;
List<Object> genericList = Lists.newArrayListWithExpectedSize(((List<Object>) data).size());
for (Object element : (List<Object>) data) {
genericList.add(toGenericRecord(listType.elementType(), element));
}
return genericList;
case MAP:
Types.MapType mapType = (Types.MapType) type;
Map<Object, Object> genericMap =
Maps.newHashMapWithExpectedSize(((Map<Object, Object>) data).size());
for (Map.Entry<Object, Object> entry : ((Map<Object, Object>) data).entrySet()) {
genericMap.put(
toGenericRecord(mapType.keyType(), entry.getKey()),
toGenericRecord(mapType.valueType(), entry.getValue()));
}
return genericMap;
default:
return data;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,12 @@
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.avro.generic.GenericData.Record;
import org.apache.iceberg.Files;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SingleValueParser;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
Expand Down Expand Up @@ -127,7 +124,7 @@ public void testDefaultValueApplied() throws IOException {

Record expectedRecord = new Record(AvroSchemaUtil.convert(readerSchema.asStruct()));
expectedRecord.put(0, 1);
expectedRecord.put(1, toGenericRecord(type, defaultValue));
expectedRecord.put(1, IcebergDataToGenericRecord.toGenericRecord(type, defaultValue));

File testFile = temp.newFile();
Assert.assertTrue("Delete should succeed", testFile.delete());
Expand Down Expand Up @@ -198,40 +195,4 @@ public void testDefaultValueNotApplied() throws IOException {
AvroTestHelpers.assertEquals(readerSchema.asStruct(), expectedRecord, rows.get(0));
}
}

// TODO: Merge the conversion mechanism with the end state of
// ValueReaders.ConstantReader.toGenericRecord().
private Object toGenericRecord(Type type, Object data) {
// Recursively convert data to GenericRecord if type is a StructType.
if (type instanceof Types.StructType) {
Types.StructType structType = (Types.StructType) type;
Record genericRecord = new Record(AvroSchemaUtil.convert(type));
int index = 0;
for (Types.NestedField field : structType.fields()) {
genericRecord.put(
field.name(), toGenericRecord(field.type(), ((GenericRecord) data).get(index)));
index++;
}
return genericRecord;
} else if (type instanceof Types.MapType) {
Types.MapType mapType = (Types.MapType) type;
Map<Object, Object> genericMap =
Maps.newHashMapWithExpectedSize(((Map<Object, Object>) data).size());
for (Map.Entry<Object, Object> entry : ((Map<Object, Object>) data).entrySet()) {
genericMap.put(
toGenericRecord(mapType.keyType(), entry.getKey()),
toGenericRecord(mapType.valueType(), entry.getValue()));
}
return genericMap;
} else if (type instanceof Types.ListType) {
Types.ListType listType = (Types.ListType) type;
List<Object> genericList = Lists.newArrayListWithExpectedSize(((List<Object>) data).size());
for (Object element : (List<Object>) data) {
genericList.add(toGenericRecord(listType.elementType(), element));
}
return genericList;
} else {
return data;
}
}
}

0 comments on commit 1720f5c

Please sign in to comment.