Skip to content

Commit 0c4e9ef

Browse files
committed
fix(binder): enhance RecordBinder and TypeAdapter to support STRUCT type conversion
1 parent 9594642 commit 0c4e9ef

File tree

8 files changed

+497
-132
lines changed

8 files changed

+497
-132
lines changed

core/src/main/java/kafka/automq/table/binder/AbstractTypeAdapter.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,10 @@
4848
*/
4949
public abstract class AbstractTypeAdapter<S> implements TypeAdapter<S> {
5050

51+
5152
@SuppressWarnings({"CyclomaticComplexity", "NPathComplexity"})
5253
@Override
53-
public Object convert(Object sourceValue, S sourceSchema, Type targetType) {
54+
public Object convert(Object sourceValue, S sourceSchema, Type targetType, StructConverter<S> structConverter) {
5455
if (sourceValue == null) {
5556
return null;
5657
}
@@ -86,6 +87,8 @@ public Object convert(Object sourceValue, S sourceSchema, Type targetType) {
8687
return convertList(sourceValue, sourceSchema, (Types.ListType) targetType);
8788
case MAP:
8889
return convertMap(sourceValue, sourceSchema, (Types.MapType) targetType);
90+
case STRUCT:
91+
return structConverter.convert(sourceValue, sourceSchema, targetType);
8992
default:
9093
return sourceValue;
9194
}

core/src/main/java/kafka/automq/table/binder/AvroValueAdapter.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,4 +185,26 @@ protected List<?> convertList(Object sourceValue, Schema sourceSchema, Types.Lis
185185
}
186186
return adaptedMap;
187187
}
188+
189+
@Override
190+
public Object convert(Object sourceValue, Schema sourceSchema, Type targetType) {
191+
return convert(sourceValue, sourceSchema, targetType, this::convertStruct);
192+
}
193+
194+
protected Object convertStruct(Object sourceValue, Schema sourceSchema, Type targetType) {
195+
org.apache.iceberg.Schema schema = targetType.asStructType().asSchema();
196+
org.apache.iceberg.data.GenericRecord result = org.apache.iceberg.data.GenericRecord.create(schema);
197+
for (Types.NestedField f : schema.columns()) {
198+
// Convert the value to the expected type
199+
GenericRecord record = (GenericRecord) sourceValue;
200+
Schema.Field sourceField = sourceSchema.getField(f.name());
201+
if (sourceField == null) {
202+
throw new IllegalStateException("Missing field '" + f.name()
203+
+ "' in source schema: " + sourceSchema.getFullName());
204+
}
205+
Object fieldValue = convert(record.get(f.name()), sourceField.schema(), f.type());
206+
result.setField(f.name(), fieldValue);
207+
}
208+
return result;
209+
}
188210
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright 2025, AutoMQ HK Limited.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package kafka.automq.table.binder;
20+
21+
import org.apache.avro.Schema;
22+
import org.apache.iceberg.types.Type;
23+
24+
/**
25+
* Represents the mapping between an Avro field and its corresponding Iceberg field.
26+
* This class stores the position, key, schema, and type information needed to
27+
* convert field values during record binding.
28+
*/
29+
public class FieldMapping {
30+
private final int avroPosition;
31+
private final String avroKey;
32+
private final Type icebergType;
33+
private final Schema avroSchema;
34+
35+
public FieldMapping(int avroPosition, String avroKey, Type icebergType, Schema avroSchema) {
36+
this.avroPosition = avroPosition;
37+
this.avroKey = avroKey;
38+
this.icebergType = icebergType;
39+
this.avroSchema = avroSchema;
40+
}
41+
42+
public int avroPosition() {
43+
return avroPosition;
44+
}
45+
46+
public String avroKey() {
47+
return avroKey;
48+
}
49+
50+
public Type icebergType() {
51+
return icebergType;
52+
}
53+
54+
public Schema avroSchema() {
55+
return avroSchema;
56+
}
57+
}

core/src/main/java/kafka/automq/table/binder/RecordBinder.java

Lines changed: 75 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
import java.nio.ByteBuffer;
3232
import java.util.HashMap;
33+
import java.util.IdentityHashMap;
3334
import java.util.List;
3435
import java.util.Map;
3536
import java.util.concurrent.atomic.AtomicLong;
@@ -48,7 +49,7 @@ public class RecordBinder {
4849
private final FieldMapping[] fieldMappings;
4950

5051
// Pre-computed RecordBinders for nested STRUCT fields
51-
private final Map<String, RecordBinder> nestedStructBinders;
52+
private final Map<Schema, RecordBinder> nestedStructBinders;
5253

5354
// Field count statistics for this batch
5455
private final AtomicLong batchFieldCount;
@@ -78,11 +79,9 @@ public RecordBinder(org.apache.iceberg.Schema icebergSchema, Schema avroSchema,
7879
}
7980

8081
// Initialize field mappings
81-
this.fieldMappings = new FieldMapping[icebergSchema.columns().size()];
82-
initializeFieldMappings(avroSchema);
83-
82+
this.fieldMappings = buildFieldMappings(avroSchema, icebergSchema);
8483
// Pre-compute nested struct binders
85-
this.nestedStructBinders = precomputeNestedStructBinders(typeAdapter);
84+
this.nestedStructBinders = precomputeBindersMap(typeAdapter);
8685
}
8786

8887
public RecordBinder createBinderForNewSchema(org.apache.iceberg.Schema icebergSchema, Schema avroSchema) {
@@ -121,8 +120,9 @@ void addFieldCount(long count) {
121120
batchFieldCount.addAndGet(count);
122121
}
123122

124-
private void initializeFieldMappings(Schema avroSchema) {
123+
private FieldMapping[] buildFieldMappings(Schema avroSchema, org.apache.iceberg.Schema icebergSchema) {
125124
Schema recordSchema = avroSchema;
125+
FieldMapping[] mappings = new FieldMapping[icebergSchema.columns().size()];
126126

127127
if (recordSchema.getType() == Schema.Type.UNION) {
128128
recordSchema = recordSchema.getTypes().stream()
@@ -137,32 +137,28 @@ private void initializeFieldMappings(Schema avroSchema) {
137137

138138
Schema.Field avroField = recordSchema.getField(fieldName);
139139
if (avroField != null) {
140-
fieldMappings[icebergPos] = createOptimizedMapping(
140+
mappings[icebergPos] = buildFieldMapping(
141141
avroField.name(),
142142
avroField.pos(),
143143
icebergField.type(),
144144
avroField.schema()
145145
);
146146
} else {
147-
fieldMappings[icebergPos] = null;
147+
mappings[icebergPos] = null;
148148
}
149149
}
150+
return mappings;
150151
}
151152

152-
private FieldMapping createOptimizedMapping(String avroFieldName, int avroPosition, Type icebergType, Schema avroType) {
153-
org.apache.iceberg.Schema nestedSchema = null;
154-
String nestedSchemaId = null;
155-
if (icebergType.isStructType()) {
156-
nestedSchema = icebergType.asStructType().asSchema();
157-
nestedSchemaId = icebergType.toString();
158-
}
153+
private FieldMapping buildFieldMapping(String avroFieldName, int avroPosition, Type icebergType, Schema avroType) {
159154
if (Type.TypeID.TIMESTAMP.equals(icebergType.typeId())
160155
|| Type.TypeID.TIME.equals(icebergType.typeId())
161156
|| Type.TypeID.MAP.equals(icebergType.typeId())
162-
|| Type.TypeID.LIST.equals(icebergType.typeId())) {
157+
|| Type.TypeID.LIST.equals(icebergType.typeId())
158+
|| Type.TypeID.STRUCT.equals(icebergType.typeId())) {
163159
avroType = resolveUnionElement(avroType);
164160
}
165-
return new FieldMapping(avroPosition, avroFieldName, icebergType, icebergType.typeId(), avroType, nestedSchema, nestedSchemaId);
161+
return new FieldMapping(avroPosition, avroFieldName, icebergType, avroType);
166162
}
167163

168164
private Schema resolveUnionElement(Schema schema) {
@@ -183,24 +179,55 @@ private Schema resolveUnionElement(Schema schema) {
183179
/**
184180
* Pre-computes RecordBinders for nested STRUCT fields.
185181
*/
186-
private Map<String, RecordBinder> precomputeNestedStructBinders(TypeAdapter<Schema> typeAdapter) {
187-
Map<String, RecordBinder> binders = new HashMap<>();
182+
private Map<Schema, RecordBinder> precomputeBindersMap(TypeAdapter<Schema> typeAdapter) {
183+
Map<Schema, RecordBinder> binders = new IdentityHashMap<>();
188184

189185
for (FieldMapping mapping : fieldMappings) {
190-
if (mapping != null && mapping.typeId() == Type.TypeID.STRUCT) {
191-
String structId = mapping.nestedSchemaId();
192-
if (!binders.containsKey(structId)) {
193-
RecordBinder nestedBinder = new RecordBinder(
194-
mapping.nestedSchema(),
186+
if (mapping != null) {
187+
Type type = mapping.icebergType();
188+
if (type.isPrimitiveType()) {
189+
} else if (type.isStructType()) {
190+
org.apache.iceberg.Schema schema = type.asStructType().asSchema();
191+
RecordBinder structBinder = new RecordBinder(
192+
schema,
195193
mapping.avroSchema(),
196194
typeAdapter,
197195
batchFieldCount
198196
);
199-
binders.put(structId, nestedBinder);
197+
binders.put(mapping.avroSchema(), structBinder);
198+
} else if (type.isListType()) {
199+
Types.ListType listType = type.asListType();
200+
Type elementType = listType.elementType();
201+
if (elementType.isStructType()) {
202+
org.apache.iceberg.Schema schema = elementType.asStructType().asSchema();
203+
RecordBinder elementBinder = new RecordBinder(
204+
schema,
205+
mapping.avroSchema().getElementType(),
206+
typeAdapter,
207+
batchFieldCount
208+
);
209+
binders.put(mapping.avroSchema().getElementType(), elementBinder);
210+
}
211+
} else if (type.isMapType()) {
212+
Types.MapType mapType = type.asMapType();
213+
Type keyType = mapType.keyType();
214+
Type valueType = mapType.valueType();
215+
if (keyType.isStructType()) {
216+
throw new UnsupportedOperationException("Struct keys in MAP types are not supported");
217+
}
218+
if (valueType.isStructType()) {
219+
org.apache.iceberg.Schema schema = valueType.asStructType().asSchema();
220+
RecordBinder valueBinder = new RecordBinder(
221+
schema,
222+
mapping.avroSchema().getValueType(),
223+
typeAdapter,
224+
batchFieldCount
225+
);
226+
binders.put(mapping.avroSchema().getValueType(), valueBinder);
227+
}
200228
}
201229
}
202230
}
203-
204231
return binders;
205232
}
206233

@@ -210,16 +237,16 @@ private static class AvroRecordView implements Record {
210237
private final TypeAdapter<Schema> typeAdapter;
211238
private final Map<String, Integer> fieldNameToPosition;
212239
private final FieldMapping[] fieldMappings;
213-
private final Map<String, RecordBinder> nestedStructBinders;
240+
private final Map<Schema, RecordBinder> nestedStructBinders;
214241
private final RecordBinder parentBinder;
215242

216243
AvroRecordView(GenericRecord avroRecord,
217-
org.apache.iceberg.Schema icebergSchema,
218-
TypeAdapter<Schema> typeAdapter,
219-
Map<String, Integer> fieldNameToPosition,
220-
FieldMapping[] fieldMappings,
221-
Map<String, RecordBinder> nestedStructBinders,
222-
RecordBinder parentBinder) {
244+
org.apache.iceberg.Schema icebergSchema,
245+
TypeAdapter<Schema> typeAdapter,
246+
Map<String, Integer> fieldNameToPosition,
247+
FieldMapping[] fieldMappings,
248+
Map<Schema, RecordBinder> nestedStructBinders,
249+
RecordBinder parentBinder) {
223250
this.avroRecord = avroRecord;
224251
this.icebergSchema = icebergSchema;
225252
this.typeAdapter = typeAdapter;
@@ -242,25 +269,11 @@ public Object get(int pos) {
242269
if (mapping == null) {
243270
return null;
244271
}
245-
246272
Object avroValue = avroRecord.get(mapping.avroPosition());
247273
if (avroValue == null) {
248274
return null;
249275
}
250-
251-
// Handle STRUCT type - delegate to nested binder
252-
if (mapping.typeId() == Type.TypeID.STRUCT) {
253-
String structId = mapping.nestedSchemaId();
254-
RecordBinder nestedBinder = nestedStructBinders.get(structId);
255-
if (nestedBinder == null) {
256-
throw new IllegalStateException("Nested binder not found for struct: " + structId);
257-
}
258-
parentBinder.addFieldCount(1);
259-
return nestedBinder.bind((GenericRecord) avroValue);
260-
}
261-
262-
// Convert non-STRUCT types
263-
Object result = typeAdapter.convert(avroValue, mapping.avroSchema(), mapping.icebergType());
276+
Object result = convert(avroValue, mapping.avroSchema(), mapping.icebergType());
264277

265278
// Calculate and accumulate field count
266279
long fieldCount = calculateFieldCount(result, mapping.icebergType());
@@ -269,6 +282,17 @@ public Object get(int pos) {
269282
return result;
270283
}
271284

285+
public Object convert(Object sourceValue, Schema sourceSchema, Type targetType) {
286+
if (targetType.typeId() == Type.TypeID.STRUCT) {
287+
RecordBinder binder = nestedStructBinders.get(sourceSchema);
288+
if (binder == null) {
289+
throw new IllegalStateException("Missing nested binder for schema: " + sourceSchema);
290+
}
291+
return binder.bind((GenericRecord) sourceValue);
292+
}
293+
return typeAdapter.convert(sourceValue, (Schema) sourceSchema, targetType, this::convert);
294+
}
295+
272296
/**
273297
* Calculates the field count for a converted value based on its size.
274298
* Large fields are counted multiple times based on the size threshold.
@@ -358,66 +382,20 @@ public <T> T get(int pos, Class<T> javaClass) {
358382
public void setField(String name, Object value) {
359383
throw new UnsupportedOperationException("Read-only");
360384
}
385+
361386
@Override
362387
public Record copy() {
363388
throw new UnsupportedOperationException("Read-only");
364389
}
390+
365391
@Override
366392
public Record copy(Map<String, Object> overwriteValues) {
367393
throw new UnsupportedOperationException("Read-only");
368394
}
395+
369396
@Override
370397
public <T> void set(int pos, T value) {
371398
throw new UnsupportedOperationException("Read-only");
372399
}
373400
}
374-
375-
// Field mapping structure
376-
private static class FieldMapping {
377-
private final int avroPosition;
378-
private final String avroKey;
379-
private final Type icebergType;
380-
private final Type.TypeID typeId;
381-
private final Schema avroSchema;
382-
private final org.apache.iceberg.Schema nestedSchema;
383-
private final String nestedSchemaId;
384-
385-
FieldMapping(int avroPosition, String avroKey, Type icebergType, Type.TypeID typeId, Schema avroSchema, org.apache.iceberg.Schema nestedSchema, String nestedSchemaId) {
386-
this.avroPosition = avroPosition;
387-
this.avroKey = avroKey;
388-
this.icebergType = icebergType;
389-
this.typeId = typeId;
390-
this.avroSchema = avroSchema;
391-
this.nestedSchema = nestedSchema;
392-
this.nestedSchemaId = nestedSchemaId;
393-
}
394-
395-
public int avroPosition() {
396-
return avroPosition;
397-
}
398-
399-
public String avroKey() {
400-
return avroKey;
401-
}
402-
403-
public Type icebergType() {
404-
return icebergType;
405-
}
406-
407-
public Type.TypeID typeId() {
408-
return typeId;
409-
}
410-
411-
public Schema avroSchema() {
412-
return avroSchema;
413-
}
414-
415-
public org.apache.iceberg.Schema nestedSchema() {
416-
return nestedSchema;
417-
}
418-
419-
public String nestedSchemaId() {
420-
return nestedSchemaId;
421-
}
422-
}
423401
}

0 commit comments

Comments
 (0)