Skip to content

Commit 9154976

Browse files
committed
feat(binder): enhance RecordBinder and TypeAdapter to support STRUCT type conversion
1 parent 9594642 commit 9154976

File tree

7 files changed

+258
-130
lines changed

7 files changed

+258
-130
lines changed

core/src/main/java/kafka/automq/table/binder/AbstractTypeAdapter.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,10 @@
4848
*/
4949
public abstract class AbstractTypeAdapter<S> implements TypeAdapter<S> {
5050

51+
5152
@SuppressWarnings({"CyclomaticComplexity", "NPathComplexity"})
5253
@Override
53-
public Object convert(Object sourceValue, S sourceSchema, Type targetType) {
54+
public Object convert(Object sourceValue, S sourceSchema, Type targetType, StructConverter<S> structConverter) {
5455
if (sourceValue == null) {
5556
return null;
5657
}
@@ -86,6 +87,8 @@ public Object convert(Object sourceValue, S sourceSchema, Type targetType) {
8687
return convertList(sourceValue, sourceSchema, (Types.ListType) targetType);
8788
case MAP:
8889
return convertMap(sourceValue, sourceSchema, (Types.MapType) targetType);
90+
case STRUCT:
91+
return structConverter.convert(sourceValue, sourceSchema, targetType);
8992
default:
9093
return sourceValue;
9194
}

core/src/main/java/kafka/automq/table/binder/AvroValueAdapter.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,4 +185,21 @@ protected List<?> convertList(Object sourceValue, Schema sourceSchema, Types.Lis
185185
}
186186
return adaptedMap;
187187
}
188+
189+
@Override
190+
public Object convert(Object sourceValue, Schema sourceSchema, Type targetType) {
191+
return super.convert(sourceValue, sourceSchema, targetType, this::convertStruct);
192+
}
193+
194+
protected Object convertStruct(Object sourceValue, Schema sourceSchema, Type targetType) {
195+
org.apache.iceberg.Schema schema = targetType.asStructType().asSchema();
196+
org.apache.iceberg.data.GenericRecord result = org.apache.iceberg.data.GenericRecord.create(schema);
197+
for (Types.NestedField f : schema.columns()) {
198+
// Convert the value to the expected type
199+
GenericRecord record = (GenericRecord) sourceValue;
200+
Object fieldValue = convert(record.get(f.name()), sourceSchema.getField(f.name()).schema(), f.type());
201+
result.setField(f.name(), fieldValue);
202+
}
203+
return result;
204+
}
188205
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package kafka.automq.table.binder;
2+
3+
import org.apache.avro.Schema;
4+
import org.apache.iceberg.types.Type;
5+
6+
public class FieldMapping {
7+
private final int avroPosition;
8+
private final String avroKey;
9+
private final Type icebergType;
10+
private final Schema avroSchema;
11+
12+
public FieldMapping(int avroPosition, String avroKey, Type icebergType, Schema avroSchema) {
13+
this.avroPosition = avroPosition;
14+
this.avroKey = avroKey;
15+
this.icebergType = icebergType;
16+
this.avroSchema = avroSchema;
17+
}
18+
19+
public int avroPosition() {
20+
return avroPosition;
21+
}
22+
23+
public String avroKey() {
24+
return avroKey;
25+
}
26+
27+
public Type icebergType() {
28+
return icebergType;
29+
}
30+
31+
public Schema avroSchema() {
32+
return avroSchema;
33+
}
34+
}

core/src/main/java/kafka/automq/table/binder/RecordBinder.java

Lines changed: 66 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121

2222
import kafka.automq.table.metric.FieldMetric;
23-
2423
import org.apache.avro.Schema;
2524
import org.apache.avro.generic.GenericRecord;
2625
import org.apache.iceberg.avro.AvroSchemaUtil;
@@ -78,11 +77,9 @@ public RecordBinder(org.apache.iceberg.Schema icebergSchema, Schema avroSchema,
7877
}
7978

8079
// Initialize field mappings
81-
this.fieldMappings = new FieldMapping[icebergSchema.columns().size()];
82-
initializeFieldMappings(avroSchema);
83-
80+
this.fieldMappings = buildFieldMappings(avroSchema, icebergSchema);
8481
// Pre-compute nested struct binders
85-
this.nestedStructBinders = precomputeNestedStructBinders(typeAdapter);
82+
this.nestedStructBinders = precomputeBindersMap(typeAdapter);
8683
}
8784

8885
public RecordBinder createBinderForNewSchema(org.apache.iceberg.Schema icebergSchema, Schema avroSchema) {
@@ -121,8 +118,9 @@ void addFieldCount(long count) {
121118
batchFieldCount.addAndGet(count);
122119
}
123120

124-
private void initializeFieldMappings(Schema avroSchema) {
121+
private FieldMapping[] buildFieldMappings(Schema avroSchema, org.apache.iceberg.Schema icebergSchema) {
125122
Schema recordSchema = avroSchema;
123+
FieldMapping[] mappings = new FieldMapping[icebergSchema.columns().size()];
126124

127125
if (recordSchema.getType() == Schema.Type.UNION) {
128126
recordSchema = recordSchema.getTypes().stream()
@@ -137,32 +135,27 @@ private void initializeFieldMappings(Schema avroSchema) {
137135

138136
Schema.Field avroField = recordSchema.getField(fieldName);
139137
if (avroField != null) {
140-
fieldMappings[icebergPos] = createOptimizedMapping(
138+
mappings[icebergPos] = buildFieldMapping(
141139
avroField.name(),
142140
avroField.pos(),
143141
icebergField.type(),
144142
avroField.schema()
145143
);
146144
} else {
147-
fieldMappings[icebergPos] = null;
145+
mappings[icebergPos] = null;
148146
}
149147
}
148+
return mappings;
150149
}
151150

152-
private FieldMapping createOptimizedMapping(String avroFieldName, int avroPosition, Type icebergType, Schema avroType) {
153-
org.apache.iceberg.Schema nestedSchema = null;
154-
String nestedSchemaId = null;
155-
if (icebergType.isStructType()) {
156-
nestedSchema = icebergType.asStructType().asSchema();
157-
nestedSchemaId = icebergType.toString();
158-
}
151+
private FieldMapping buildFieldMapping(String avroFieldName, int avroPosition, Type icebergType, Schema avroType) {
159152
if (Type.TypeID.TIMESTAMP.equals(icebergType.typeId())
160153
|| Type.TypeID.TIME.equals(icebergType.typeId())
161154
|| Type.TypeID.MAP.equals(icebergType.typeId())
162155
|| Type.TypeID.LIST.equals(icebergType.typeId())) {
163156
avroType = resolveUnionElement(avroType);
164157
}
165-
return new FieldMapping(avroPosition, avroFieldName, icebergType, icebergType.typeId(), avroType, nestedSchema, nestedSchemaId);
158+
return new FieldMapping(avroPosition, avroFieldName, icebergType, avroType);
166159
}
167160

168161
private Schema resolveUnionElement(Schema schema) {
@@ -183,24 +176,55 @@ private Schema resolveUnionElement(Schema schema) {
183176
/**
184177
* Pre-computes RecordBinders for nested STRUCT fields.
185178
*/
186-
private Map<String, RecordBinder> precomputeNestedStructBinders(TypeAdapter<Schema> typeAdapter) {
179+
private Map<String, RecordBinder> precomputeBindersMap(TypeAdapter<Schema> typeAdapter) {
187180
Map<String, RecordBinder> binders = new HashMap<>();
188181

189182
for (FieldMapping mapping : fieldMappings) {
190-
if (mapping != null && mapping.typeId() == Type.TypeID.STRUCT) {
191-
String structId = mapping.nestedSchemaId();
192-
if (!binders.containsKey(structId)) {
193-
RecordBinder nestedBinder = new RecordBinder(
194-
mapping.nestedSchema(),
183+
if (mapping != null) {
184+
Type type = mapping.icebergType();
185+
if (type.isPrimitiveType()) {
186+
} else if (type.isStructType()) {
187+
org.apache.iceberg.Schema schema = type.asStructType().asSchema();
188+
RecordBinder structBinder = new RecordBinder(
189+
schema,
195190
mapping.avroSchema(),
196191
typeAdapter,
197192
batchFieldCount
198193
);
199-
binders.put(structId, nestedBinder);
194+
binders.put(mapping.avroSchema().getFullName(), structBinder);
195+
} else if (type.isListType()) {
196+
Types.ListType listType = type.asListType();
197+
Type elementType = listType.elementType();
198+
if (elementType.isStructType()) {
199+
org.apache.iceberg.Schema schema = elementType.asStructType().asSchema();
200+
RecordBinder elementBinder = new RecordBinder(
201+
schema,
202+
mapping.avroSchema().getElementType(),
203+
typeAdapter,
204+
batchFieldCount
205+
);
206+
binders.put(mapping.avroSchema().getElementType().getFullName(), elementBinder);
207+
}
208+
} else if (type.isMapType()) {
209+
Types.MapType mapType = type.asMapType();
210+
Type keyType = mapType.keyType();
211+
Type valueType = mapType.valueType();
212+
if (keyType.isStructType()) {
213+
throw new UnsupportedOperationException("Struct keys in MAP types are not supported");
214+
}
215+
if (valueType.isStructType()) {
216+
org.apache.iceberg.Schema schema = valueType.asStructType().asSchema();
217+
RecordBinder valueBinder = new RecordBinder(
218+
schema,
219+
mapping.avroSchema().getValueType(),
220+
typeAdapter,
221+
batchFieldCount
222+
);
223+
binders.put(mapping.avroSchema().getValueType().getFullName(), valueBinder);
224+
}
200225
}
201226
}
202227
}
203-
204228
return binders;
205229
}
206230

@@ -214,12 +238,12 @@ private static class AvroRecordView implements Record {
214238
private final RecordBinder parentBinder;
215239

216240
AvroRecordView(GenericRecord avroRecord,
217-
org.apache.iceberg.Schema icebergSchema,
218-
TypeAdapter<Schema> typeAdapter,
219-
Map<String, Integer> fieldNameToPosition,
220-
FieldMapping[] fieldMappings,
221-
Map<String, RecordBinder> nestedStructBinders,
222-
RecordBinder parentBinder) {
241+
org.apache.iceberg.Schema icebergSchema,
242+
TypeAdapter<Schema> typeAdapter,
243+
Map<String, Integer> fieldNameToPosition,
244+
FieldMapping[] fieldMappings,
245+
Map<String, RecordBinder> nestedStructBinders,
246+
RecordBinder parentBinder) {
223247
this.avroRecord = avroRecord;
224248
this.icebergSchema = icebergSchema;
225249
this.typeAdapter = typeAdapter;
@@ -242,25 +266,11 @@ public Object get(int pos) {
242266
if (mapping == null) {
243267
return null;
244268
}
245-
246269
Object avroValue = avroRecord.get(mapping.avroPosition());
247270
if (avroValue == null) {
248271
return null;
249272
}
250-
251-
// Handle STRUCT type - delegate to nested binder
252-
if (mapping.typeId() == Type.TypeID.STRUCT) {
253-
String structId = mapping.nestedSchemaId();
254-
RecordBinder nestedBinder = nestedStructBinders.get(structId);
255-
if (nestedBinder == null) {
256-
throw new IllegalStateException("Nested binder not found for struct: " + structId);
257-
}
258-
parentBinder.addFieldCount(1);
259-
return nestedBinder.bind((GenericRecord) avroValue);
260-
}
261-
262-
// Convert non-STRUCT types
263-
Object result = typeAdapter.convert(avroValue, mapping.avroSchema(), mapping.icebergType());
273+
Object result = convert(avroValue, mapping.avroSchema(), mapping.icebergType());
264274

265275
// Calculate and accumulate field count
266276
long fieldCount = calculateFieldCount(result, mapping.icebergType());
@@ -269,6 +279,14 @@ public Object get(int pos) {
269279
return result;
270280
}
271281

282+
public Object convert(Object sourceValue, Schema sourceSchema, Type targetType) {
283+
if (targetType.typeId() == Type.TypeID.STRUCT) {
284+
RecordBinder binder = nestedStructBinders.get(sourceSchema.getFullName());
285+
return binder.bind((GenericRecord) sourceValue);
286+
}
287+
return typeAdapter.convert(sourceValue, (Schema) sourceSchema, targetType, this::convert);
288+
}
289+
272290
/**
273291
* Calculates the field count for a converted value based on its size.
274292
* Large fields are counted multiple times based on the size threshold.
@@ -358,66 +376,20 @@ public <T> T get(int pos, Class<T> javaClass) {
358376
public void setField(String name, Object value) {
359377
throw new UnsupportedOperationException("Read-only");
360378
}
379+
361380
@Override
362381
public Record copy() {
363382
throw new UnsupportedOperationException("Read-only");
364383
}
384+
365385
@Override
366386
public Record copy(Map<String, Object> overwriteValues) {
367387
throw new UnsupportedOperationException("Read-only");
368388
}
389+
369390
@Override
370391
public <T> void set(int pos, T value) {
371392
throw new UnsupportedOperationException("Read-only");
372393
}
373394
}
374-
375-
// Field mapping structure
376-
private static class FieldMapping {
377-
private final int avroPosition;
378-
private final String avroKey;
379-
private final Type icebergType;
380-
private final Type.TypeID typeId;
381-
private final Schema avroSchema;
382-
private final org.apache.iceberg.Schema nestedSchema;
383-
private final String nestedSchemaId;
384-
385-
FieldMapping(int avroPosition, String avroKey, Type icebergType, Type.TypeID typeId, Schema avroSchema, org.apache.iceberg.Schema nestedSchema, String nestedSchemaId) {
386-
this.avroPosition = avroPosition;
387-
this.avroKey = avroKey;
388-
this.icebergType = icebergType;
389-
this.typeId = typeId;
390-
this.avroSchema = avroSchema;
391-
this.nestedSchema = nestedSchema;
392-
this.nestedSchemaId = nestedSchemaId;
393-
}
394-
395-
public int avroPosition() {
396-
return avroPosition;
397-
}
398-
399-
public String avroKey() {
400-
return avroKey;
401-
}
402-
403-
public Type icebergType() {
404-
return icebergType;
405-
}
406-
407-
public Type.TypeID typeId() {
408-
return typeId;
409-
}
410-
411-
public Schema avroSchema() {
412-
return avroSchema;
413-
}
414-
415-
public org.apache.iceberg.Schema nestedSchema() {
416-
return nestedSchema;
417-
}
418-
419-
public String nestedSchemaId() {
420-
return nestedSchemaId;
421-
}
422-
}
423395
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package kafka.automq.table.binder;
2+
3+
import org.apache.iceberg.types.Type;
4+
5+
@FunctionalInterface
6+
public interface StructConverter<S> {
7+
8+
Object convert(Object sourceValue, S sourceSchema, Type targetType);
9+
}

core/src/main/java/kafka/automq/table/binder/TypeAdapter.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,6 @@ public interface TypeAdapter<S> {
3737
*/
3838
Object convert(Object sourceValue, S sourceSchema, Type targetType);
3939

40+
41+
Object convert(Object sourceValue, S sourceSchema, Type targetType, StructConverter<S> structConverter);
4042
}

0 commit comments

Comments
 (0)