Skip to content

Commit 136d326

Browse files
committed
fix(adapter): update list and map conversion methods to include StructConverter
1 parent 0a3f410 commit 136d326

File tree

4 files changed

+84
-34
lines changed

4 files changed

+84
-34
lines changed

core/src/main/java/kafka/automq/table/binder/AbstractTypeAdapter.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,9 @@ public Object convert(Object sourceValue, S sourceSchema, Type targetType, Struc
8484
case TIMESTAMP:
8585
return convertTimestamp(sourceValue, sourceSchema, (Types.TimestampType) targetType);
8686
case LIST:
87-
return convertList(sourceValue, sourceSchema, (Types.ListType) targetType);
87+
return convertList(sourceValue, sourceSchema, (Types.ListType) targetType, structConverter);
8888
case MAP:
89-
return convertMap(sourceValue, sourceSchema, (Types.MapType) targetType);
89+
return convertMap(sourceValue, sourceSchema, (Types.MapType) targetType, structConverter);
9090
case STRUCT:
9191
return structConverter.convert(sourceValue, sourceSchema, targetType);
9292
default:
@@ -212,7 +212,7 @@ protected Object convertTimestamp(Object sourceValue, S sourceSchema, Types.Time
212212
throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId());
213213
}
214214

215-
protected abstract List<?> convertList(Object sourceValue, S sourceSchema, Types.ListType targetType);
215+
protected abstract List<?> convertList(Object sourceValue, S sourceSchema, Types.ListType targetType, StructConverter<S> structConverter);
216216

217-
protected abstract Map<?, ?> convertMap(Object sourceValue, S sourceSchema, Types.MapType targetType);
217+
protected abstract Map<?, ?> convertMap(Object sourceValue, S sourceSchema, Types.MapType targetType, StructConverter<S> structConverter);
218218
}

core/src/main/java/kafka/automq/table/binder/AvroValueAdapter.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ protected Object convertTimestamp(Object sourceValue, Schema sourceSchema, Types
116116
}
117117

118118
@Override
119-
protected List<?> convertList(Object sourceValue, Schema sourceSchema, Types.ListType targetType) {
119+
protected List<?> convertList(Object sourceValue, Schema sourceSchema, Types.ListType targetType, StructConverter<Schema> structConverter) {
120120
Schema listSchema = sourceSchema;
121121
Schema elementSchema = listSchema.getElementType();
122122

@@ -131,14 +131,14 @@ protected List<?> convertList(Object sourceValue, Schema sourceSchema, Types.Lis
131131

132132
List<Object> list = new ArrayList<>(sourceList.size());
133133
for (Object element : sourceList) {
134-
Object convert = convert(element, elementSchema, targetType.elementType());
134+
Object convert = convert(element, elementSchema, targetType.elementType(), structConverter);
135135
list.add(convert);
136136
}
137137
return list;
138138
}
139139

140140
@Override
141-
protected Map<?, ?> convertMap(Object sourceValue, Schema sourceSchema, Types.MapType targetType) {
141+
protected Map<?, ?> convertMap(Object sourceValue, Schema sourceSchema, Types.MapType targetType, StructConverter<Schema> structConverter) {
142142
if (sourceValue instanceof GenericData.Array) {
143143
GenericData.Array<?> arrayValue = (GenericData.Array<?>) sourceValue;
144144
Map<Object, Object> recordMap = new HashMap<>(arrayValue.size());
@@ -161,8 +161,8 @@ protected List<?> convertList(Object sourceValue, Schema sourceSchema, Types.Lis
161161
continue;
162162
}
163163
GenericRecord record = (GenericRecord) element;
164-
Object key = convert(record.get(keyField.pos()), keySchema, keyType);
165-
Object value = convert(record.get(valueField.pos()), valueSchema, valueType);
164+
Object key = convert(record.get(keyField.pos()), keySchema, keyType, structConverter);
165+
Object value = convert(record.get(valueField.pos()), valueSchema, valueType, structConverter);
166166
recordMap.put(key, value);
167167
}
168168
return recordMap;
@@ -179,8 +179,8 @@ protected List<?> convertList(Object sourceValue, Schema sourceSchema, Types.Lis
179179

180180
for (Map.Entry<?, ?> entry : sourceMap.entrySet()) {
181181
Object rawKey = entry.getKey();
182-
Object key = convert(rawKey, STRING_SCHEMA_INSTANCE, keyType);
183-
Object value = convert(entry.getValue(), valueSchema, valueType);
182+
Object key = convert(rawKey, STRING_SCHEMA_INSTANCE, keyType, structConverter);
183+
Object value = convert(entry.getValue(), valueSchema, valueType, structConverter);
184184
adaptedMap.put(key, value);
185185
}
186186
return adaptedMap;

core/src/test/java/kafka/automq/table/binder/AvroRecordBinderTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
package kafka.automq.table.binder;
2121

2222
import com.google.common.collect.ImmutableMap;
23-
2423
import org.apache.avro.Conversions;
2524
import org.apache.avro.LogicalTypes;
2625
import org.apache.avro.Schema;
@@ -122,7 +121,7 @@ private void testSendRecord(org.apache.iceberg.Schema schema, Record record) {
122121
}
123122
}
124123

125-
private TaskWriter<Record> createTableWriter(Table table) {
124+
public static TaskWriter<Record> createTableWriter(Table table) {
126125
FileAppenderFactory<Record> appenderFactory = new GenericAppenderFactory(
127126
table.schema(),
128127
table.spec(),

core/src/test/java/kafka/automq/table/process/convert/ProtobufRegistryConverterTest.java

Lines changed: 72 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,41 @@
11
package kafka.automq.table.process.convert;
22

3-
import kafka.automq.table.deserializer.proto.CustomProtobufSchema;
4-
import kafka.automq.table.deserializer.proto.ProtobufSchemaProvider;
5-
import kafka.automq.table.deserializer.proto.parse.ProtobufSchemaParser;
6-
import kafka.automq.table.deserializer.proto.parse.converter.ProtoConstants;
7-
import kafka.automq.table.deserializer.proto.schema.DynamicSchema;
8-
import kafka.automq.table.process.ConversionResult;
9-
10-
import org.apache.kafka.common.utils.ByteUtils;
11-
3+
import com.google.common.collect.ImmutableMap;
124
import com.google.protobuf.ByteString;
135
import com.google.protobuf.Descriptors;
146
import com.google.protobuf.DynamicMessage;
157
import com.google.protobuf.Timestamp;
168
import com.squareup.wire.schema.internal.parser.ProtoFileElement;
179
import com.squareup.wire.schema.internal.parser.ProtoParser;
18-
10+
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
11+
import kafka.automq.table.binder.RecordBinder;
12+
import kafka.automq.table.deserializer.proto.CustomProtobufSchema;
13+
import kafka.automq.table.deserializer.proto.ProtobufSchemaProvider;
14+
import kafka.automq.table.deserializer.proto.parse.ProtobufSchemaParser;
15+
import kafka.automq.table.deserializer.proto.parse.converter.ProtoConstants;
16+
import kafka.automq.table.deserializer.proto.schema.DynamicSchema;
17+
import kafka.automq.table.process.ConversionResult;
1918
import org.apache.avro.generic.GenericRecord;
19+
import org.apache.iceberg.Table;
20+
import org.apache.iceberg.avro.AvroSchemaUtil;
21+
import org.apache.iceberg.catalog.Namespace;
22+
import org.apache.iceberg.catalog.TableIdentifier;
23+
import org.apache.iceberg.data.Record;
24+
import org.apache.iceberg.inmemory.InMemoryCatalog;
25+
import org.apache.iceberg.io.TaskWriter;
26+
import org.apache.kafka.common.utils.ByteUtils;
2027
import org.junit.jupiter.api.Tag;
2128
import org.junit.jupiter.api.Test;
2229

30+
import java.io.IOException;
2331
import java.nio.ByteBuffer;
2432
import java.nio.charset.StandardCharsets;
2533
import java.util.Collections;
2634
import java.util.List;
2735
import java.util.Map;
2836
import java.util.stream.Collectors;
2937

30-
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
31-
38+
import static kafka.automq.table.binder.AvroRecordBinderTest.createTableWriter;
3239
import static org.junit.jupiter.api.Assertions.assertEquals;
3340
import static org.junit.jupiter.api.Assertions.assertSame;
3441

@@ -37,21 +44,29 @@ public class ProtobufRegistryConverterTest {
3744

3845
private static final String ALL_TYPES_PROTO = """
3946
syntax = \"proto3\";
40-
47+
4148
package kafka.automq.table.process.proto;
42-
49+
4350
import \"google/protobuf/timestamp.proto\";
44-
51+
4552
message Nested {
4653
string name = 1;
4754
int32 count = 2;
4855
}
49-
56+
5057
enum SampleEnum {
5158
SAMPLE_ENUM_UNSPECIFIED = 0;
5259
SAMPLE_ENUM_SECOND = 1;
5360
}
54-
61+
62+
message FloatArray {
63+
repeated double values = 1;
64+
}
65+
66+
message StringArray {
67+
repeated string values = 1;
68+
}
69+
5570
message AllTypes {
5671
// Scalar primitives in order defined by Avro ProtobufData mapping
5772
bool f_bool = 1;
@@ -79,12 +94,28 @@ enum SampleEnum {
7994
oneof choice {
8095
string choice_str = 22;
8196
int32 choice_int = 23;
97+
FloatArray choice_float_array = 26;
98+
StringArray choice_string_array = 27;
8299
}
83100
repeated Nested f_nested_list = 24;
84101
map<string, Nested> f_string_nested_map = 25;
85102
}
86103
""";
87104

105+
private void testSendRecord(org.apache.iceberg.Schema schema, org.apache.iceberg.data.Record record) {
106+
InMemoryCatalog catalog = new InMemoryCatalog();
107+
catalog.initialize("test", ImmutableMap.of());
108+
catalog.createNamespace(Namespace.of("default"));
109+
String tableName = "test";
110+
Table table = catalog.createTable(TableIdentifier.of(Namespace.of("default"), tableName), schema);
111+
TaskWriter<org.apache.iceberg.data.Record> writer = createTableWriter(table);
112+
try {
113+
writer.write(record);
114+
} catch (IOException e) {
115+
throw new RuntimeException(e);
116+
}
117+
}
118+
88119
@Test
89120
void testConvertAllPrimitiveAndCollectionTypes() throws Exception {
90121
String topic = "pb-all-types";
@@ -108,7 +139,7 @@ void testConvertAllPrimitiveAndCollectionTypes() throws Exception {
108139

109140
DynamicMessage message = buildAllTypesMessage(descriptor);
110141
// magic byte + schema id + single message index + serialized protobuf payload
111-
ByteBuffer payload = buildConfluentPayload(schemaId, message.toByteArray(), 1);
142+
ByteBuffer payload = buildConfluentPayload(schemaId, message.toByteArray(), 3);
112143

113144
ProtobufRegistryConverter converter = new ProtobufRegistryConverter(registryClient, "http://mock:8081", false);
114145

@@ -121,6 +152,11 @@ void testConvertAllPrimitiveAndCollectionTypes() throws Exception {
121152
assertPrimitiveFields(record);
122153
assertRepeatedAndMapFields(record);
123154
assertNestedAndTimestamp(record);
155+
156+
org.apache.iceberg.Schema iceberg = AvroSchemaUtil.toIceberg(record.getSchema());
157+
RecordBinder recordBinder = new RecordBinder(iceberg, record.getSchema());
158+
Record bind = recordBinder.bind(record);
159+
testSendRecord(iceberg, bind);
124160
}
125161

126162
private static DynamicMessage buildAllTypesMessage(Descriptors.Descriptor descriptor) {
@@ -145,7 +181,16 @@ private static DynamicMessage buildAllTypesMessage(Descriptors.Descriptor descri
145181
descriptor.findFieldByName("f_enum"),
146182
descriptor.getFile().findEnumTypeByName("SampleEnum").findValueByName("SAMPLE_ENUM_SECOND")
147183
);
148-
builder.setField(descriptor.findFieldByName("choice_str"), "choice-string");
184+
185+
// Build FloatArray for oneof choice
186+
Descriptors.FieldDescriptor floatArrayField = descriptor.findFieldByName("choice_float_array");
187+
Descriptors.Descriptor floatArrayDescriptor = floatArrayField.getMessageType();
188+
DynamicMessage.Builder floatArrayBuilder = DynamicMessage.newBuilder(floatArrayDescriptor);
189+
Descriptors.FieldDescriptor floatValuesField = floatArrayDescriptor.findFieldByName("values");
190+
floatArrayBuilder.addRepeatedField(floatValuesField, 1.1);
191+
floatArrayBuilder.addRepeatedField(floatValuesField, 2.2);
192+
floatArrayBuilder.addRepeatedField(floatValuesField, 3.3);
193+
builder.setField(floatArrayField, floatArrayBuilder.build());
149194

150195
Descriptors.FieldDescriptor nestedField = descriptor.findFieldByName("f_message");
151196
Descriptors.Descriptor nestedDescriptor = nestedField.getMessageType();
@@ -286,8 +331,14 @@ private static void assertNestedAndTimestamp(GenericRecord record) {
286331
// Optional field should fall back to proto3 default (empty string)
287332
assertEquals("", getField(record, "f_optional_string", "fOptionalString").toString());
288333

289-
Object oneofValue = getField(record, "choice_str", "choiceStr");
290-
assertEquals("choice-string", oneofValue.toString());
334+
// Verify oneof with complex FloatArray type
335+
GenericRecord floatArrayValue = (GenericRecord) getField(record, "choice_float_array", "floatArray");
336+
List<?> floatValues = (List<?>) floatArrayValue.get("values");
337+
List<Double> expectedFloats = List.of(1.1, 2.2, 3.3);
338+
assertEquals(expectedFloats.size(), floatValues.size());
339+
for (int i = 0; i < expectedFloats.size(); i++) {
340+
assertEquals(expectedFloats.get(i), (Double) floatValues.get(i), 1e-6);
341+
}
291342
}
292343

293344
private static Object getField(GenericRecord record, String... candidateNames) {

0 commit comments

Comments
 (0)