Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,10 @@
*/
public abstract class AbstractTypeAdapter<S> implements TypeAdapter<S> {


@SuppressWarnings({"CyclomaticComplexity", "NPathComplexity"})
@Override
public Object convert(Object sourceValue, S sourceSchema, Type targetType) {
public Object convert(Object sourceValue, S sourceSchema, Type targetType, StructConverter<S> structConverter) {
if (sourceValue == null) {
return null;
}
Expand Down Expand Up @@ -83,9 +84,11 @@ public Object convert(Object sourceValue, S sourceSchema, Type targetType) {
case TIMESTAMP:
return convertTimestamp(sourceValue, sourceSchema, (Types.TimestampType) targetType);
case LIST:
return convertList(sourceValue, sourceSchema, (Types.ListType) targetType);
return convertList(sourceValue, sourceSchema, (Types.ListType) targetType, structConverter);
case MAP:
return convertMap(sourceValue, sourceSchema, (Types.MapType) targetType);
return convertMap(sourceValue, sourceSchema, (Types.MapType) targetType, structConverter);
case STRUCT:
return structConverter.convert(sourceValue, sourceSchema, targetType);
default:
return sourceValue;
}
Expand Down Expand Up @@ -203,10 +206,13 @@ protected Object convertTimestamp(Object sourceValue, S sourceSchema, Types.Time
Instant instant = Instant.parse(sourceValue.toString());
return DateTimeUtil.timestamptzFromMicros(DateTimeUtil.microsFromInstant(instant));
}
if (sourceValue instanceof Number) {
return DateTimeUtil.timestamptzFromMicros(((Number) sourceValue).longValue());
}
throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId());
}

protected abstract List<?> convertList(Object sourceValue, S sourceSchema, Types.ListType targetType);
protected abstract List<?> convertList(Object sourceValue, S sourceSchema, Types.ListType targetType, StructConverter<S> structConverter);

protected abstract Map<?, ?> convertMap(Object sourceValue, S sourceSchema, Types.MapType targetType);
protected abstract Map<?, ?> convertMap(Object sourceValue, S sourceSchema, Types.MapType targetType, StructConverter<S> structConverter);
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ protected Object convertTimestamp(Object sourceValue, Schema sourceSchema, Types
}

@Override
protected List<?> convertList(Object sourceValue, Schema sourceSchema, Types.ListType targetType) {
protected List<?> convertList(Object sourceValue, Schema sourceSchema, Types.ListType targetType, StructConverter<Schema> structConverter) {
Schema listSchema = sourceSchema;
Schema elementSchema = listSchema.getElementType();

Expand All @@ -131,14 +131,14 @@ protected List<?> convertList(Object sourceValue, Schema sourceSchema, Types.Lis

List<Object> list = new ArrayList<>(sourceList.size());
for (Object element : sourceList) {
Object convert = convert(element, elementSchema, targetType.elementType());
Object convert = convert(element, elementSchema, targetType.elementType(), structConverter);
list.add(convert);
}
return list;
}

@Override
protected Map<?, ?> convertMap(Object sourceValue, Schema sourceSchema, Types.MapType targetType) {
protected Map<?, ?> convertMap(Object sourceValue, Schema sourceSchema, Types.MapType targetType, StructConverter<Schema> structConverter) {
if (sourceValue instanceof GenericData.Array) {
GenericData.Array<?> arrayValue = (GenericData.Array<?>) sourceValue;
Map<Object, Object> recordMap = new HashMap<>(arrayValue.size());
Expand All @@ -161,8 +161,8 @@ protected List<?> convertList(Object sourceValue, Schema sourceSchema, Types.Lis
continue;
}
GenericRecord record = (GenericRecord) element;
Object key = convert(record.get(keyField.pos()), keySchema, keyType);
Object value = convert(record.get(valueField.pos()), valueSchema, valueType);
Object key = convert(record.get(keyField.pos()), keySchema, keyType, structConverter);
Object value = convert(record.get(valueField.pos()), valueSchema, valueType, structConverter);
recordMap.put(key, value);
}
return recordMap;
Expand All @@ -179,10 +179,32 @@ protected List<?> convertList(Object sourceValue, Schema sourceSchema, Types.Lis

for (Map.Entry<?, ?> entry : sourceMap.entrySet()) {
Object rawKey = entry.getKey();
Object key = convert(rawKey, STRING_SCHEMA_INSTANCE, keyType);
Object value = convert(entry.getValue(), valueSchema, valueType);
Object key = convert(rawKey, STRING_SCHEMA_INSTANCE, keyType, structConverter);
Object value = convert(entry.getValue(), valueSchema, valueType, structConverter);
adaptedMap.put(key, value);
}
return adaptedMap;
}

@Override
public Object convert(Object sourceValue, Schema sourceSchema, Type targetType) {
return convert(sourceValue, sourceSchema, targetType, this::convertStruct);
}

protected Object convertStruct(Object sourceValue, Schema sourceSchema, Type targetType) {
org.apache.iceberg.Schema schema = targetType.asStructType().asSchema();
org.apache.iceberg.data.GenericRecord result = org.apache.iceberg.data.GenericRecord.create(schema);
for (Types.NestedField f : schema.columns()) {
// Convert the value to the expected type
GenericRecord record = (GenericRecord) sourceValue;
Schema.Field sourceField = sourceSchema.getField(f.name());
if (sourceField == null) {
throw new IllegalStateException("Missing field '" + f.name()
+ "' in source schema: " + sourceSchema.getFullName());
}
Object fieldValue = convert(record.get(f.name()), sourceField.schema(), f.type());
result.setField(f.name(), fieldValue);
}
return result;
}
}
57 changes: 57 additions & 0 deletions core/src/main/java/kafka/automq/table/binder/FieldMapping.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2025, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.automq.table.binder;

import org.apache.avro.Schema;
import org.apache.iceberg.types.Type;

/**
* Represents the mapping between an Avro field and its corresponding Iceberg field.
* This class stores the position, key, schema, and type information needed to
* convert field values during record binding.
*/
public class FieldMapping {
private final int avroPosition;
private final String avroKey;
private final Type icebergType;
private final Schema avroSchema;

public FieldMapping(int avroPosition, String avroKey, Type icebergType, Schema avroSchema) {
this.avroPosition = avroPosition;
this.avroKey = avroKey;
this.icebergType = icebergType;
this.avroSchema = avroSchema;
}

public int avroPosition() {
return avroPosition;
}

public String avroKey() {
return avroKey;
}

public Type icebergType() {
return icebergType;
}

public Schema avroSchema() {
return avroSchema;
}
}
Loading
Loading