Skip to content

Commit

Permalink
Kafka Connect: Initial project setup and event data structures (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanck authored Jan 10, 2024
1 parent c6a7726 commit d1a3c10
Show file tree
Hide file tree
Showing 19 changed files with 1,529 additions and 24 deletions.
6 changes: 6 additions & 0 deletions .github/labeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -185,3 +185,9 @@ AZURE:
'azure/**/*',
'azure-bundle/**/*'
]

KAFKACONNECT:
- changed-files:
- any-glob-to-any-file: [
'kafka-connect/**/*'
]
13 changes: 10 additions & 3 deletions core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
import org.apache.avro.JsonProperties;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
Expand Down Expand Up @@ -59,7 +60,7 @@ public static Schema convert(org.apache.iceberg.Schema schema, String tableName)

public static Schema convert(
org.apache.iceberg.Schema schema, Map<Types.StructType, String> names) {
return TypeUtil.visit(schema, new TypeToSchema(names));
return TypeUtil.visit(schema, new TypeToSchema.WithTypeToName(names));
}

public static Schema convert(Type type) {
Expand All @@ -71,7 +72,12 @@ public static Schema convert(Types.StructType type, String name) {
}

public static Schema convert(Type type, Map<Types.StructType, String> names) {
return TypeUtil.visit(type, new TypeToSchema(names));
return TypeUtil.visit(type, new TypeToSchema.WithTypeToName(names));
}

public static Schema convert(
Type type, BiFunction<Integer, Types.StructType, String> namesFunction) {
return TypeUtil.visit(type, new TypeToSchema.WithNamesFunction(namesFunction));
}

public static Type convert(Schema schema) {
Expand Down Expand Up @@ -111,7 +117,8 @@ static boolean missingIds(Schema schema) {
}

public static Map<Type, Schema> convertTypes(Types.StructType type, String name) {
TypeToSchema converter = new TypeToSchema(ImmutableMap.of(type, name));
TypeToSchema.WithTypeToName converter =
new TypeToSchema.WithTypeToName(ImmutableMap.of(type, name));
TypeUtil.visit(type, converter);
return ImmutableMap.copyOf(converter.getConversionMap());
}
Expand Down
94 changes: 73 additions & 21 deletions core/src/main/java/org/apache/iceberg/avro/TypeToSchema.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import org.apache.avro.JsonProperties;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
Expand All @@ -30,7 +31,7 @@
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;

class TypeToSchema extends TypeUtil.SchemaVisitor<Schema> {
abstract class TypeToSchema extends TypeUtil.SchemaVisitor<Schema> {
private static final Schema BOOLEAN_SCHEMA = Schema.create(Schema.Type.BOOLEAN);
private static final Schema INTEGER_SCHEMA = Schema.create(Schema.Type.INT);
private static final Schema LONG_SCHEMA = Schema.create(Schema.Type.LONG);
Expand All @@ -55,15 +56,10 @@ class TypeToSchema extends TypeUtil.SchemaVisitor<Schema> {
}

private final Deque<Integer> fieldIds = Lists.newLinkedList();
private final Map<Type, Schema> results = Maps.newHashMap();
private final Map<Types.StructType, String> names;
private final BiFunction<Integer, Types.StructType, String> namesFunction;

TypeToSchema(Map<Types.StructType, String> names) {
this.names = names;
}

Map<Type, Schema> getConversionMap() {
return results;
TypeToSchema(BiFunction<Integer, Types.StructType, String> namesFunction) {
this.namesFunction = namesFunction;
}

@Override
Expand All @@ -81,16 +77,29 @@ public void afterField(Types.NestedField field) {
fieldIds.pop();
}

Schema lookupSchema(Type type) {
return lookupSchema(type, null);
}

abstract Schema lookupSchema(Type type, String recordName);

void cacheSchema(Type struct, Schema schema) {
cacheSchema(struct, null, schema);
}

abstract void cacheSchema(Type struct, String recordName, Schema schema);

@Override
public Schema struct(Types.StructType struct, List<Schema> fieldSchemas) {
Schema recordSchema = results.get(struct);
if (recordSchema != null) {
return recordSchema;
Integer fieldId = fieldIds.peek();
String recordName = namesFunction.apply(fieldId, struct);
if (recordName == null) {
recordName = "r" + fieldId;
}

String recordName = names.get(struct);
if (recordName == null) {
recordName = "r" + fieldIds.peek();
Schema recordSchema = lookupSchema(struct, recordName);
if (recordSchema != null) {
return recordSchema;
}

List<Types.NestedField> structFields = struct.fields();
Expand All @@ -115,7 +124,7 @@ public Schema struct(Types.StructType struct, List<Schema> fieldSchemas) {

recordSchema = Schema.createRecord(recordName, null, null, false, fields);

results.put(struct, recordSchema);
cacheSchema(struct, recordName, recordSchema);

return recordSchema;
}
Expand All @@ -131,7 +140,7 @@ public Schema field(Types.NestedField field, Schema fieldSchema) {

@Override
public Schema list(Types.ListType list, Schema elementSchema) {
Schema listSchema = results.get(list);
Schema listSchema = lookupSchema(list);
if (listSchema != null) {
return listSchema;
}
Expand All @@ -144,14 +153,14 @@ public Schema list(Types.ListType list, Schema elementSchema) {

listSchema.addProp(AvroSchemaUtil.ELEMENT_ID_PROP, list.elementId());

results.put(list, listSchema);
cacheSchema(list, listSchema);

return listSchema;
}

@Override
public Schema map(Types.MapType map, Schema keySchema, Schema valueSchema) {
Schema mapSchema = results.get(map);
Schema mapSchema = lookupSchema(map);
if (mapSchema != null) {
return mapSchema;
}
Expand All @@ -173,7 +182,7 @@ public Schema map(Types.MapType map, Schema keySchema, Schema valueSchema) {
map.isValueOptional() ? AvroSchemaUtil.toOption(valueSchema) : valueSchema);
}

results.put(map, mapSchema);
cacheSchema(map, mapSchema);

return mapSchema;
}
Expand Down Expand Up @@ -238,8 +247,51 @@ public Schema primitive(Type.PrimitiveType primitive) {
throw new UnsupportedOperationException("Unsupported type ID: " + primitive.typeId());
}

results.put(primitive, primitiveSchema);
cacheSchema(primitive, primitiveSchema);

return primitiveSchema;
}

static class WithTypeToName extends TypeToSchema {

private final Map<Type, Schema> results = Maps.newHashMap();

WithTypeToName(Map<Types.StructType, String> names) {
super((id, struct) -> names.get(struct));
}

Map<Type, Schema> getConversionMap() {
return results;
}

@Override
void cacheSchema(Type type, String recordName, Schema schema) {
results.put(type, schema);
}

@Override
Schema lookupSchema(Type type, String recordName) {
return results.get(type);
}
}

static class WithNamesFunction extends TypeToSchema {
private final Map<String, Schema> schemaCache = Maps.newHashMap();

WithNamesFunction(BiFunction<Integer, Types.StructType, String> namesFunction) {
super(namesFunction);
}

@Override
void cacheSchema(Type type, String recordName, Schema schema) {
if (recordName != null) {
schemaCache.put(recordName, schema);
}
}

@Override
Schema lookupSchema(Type type, String recordName) {
return recordName == null ? null : schemaCache.get(recordName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ public static <T> T resolveAndRead(
return value;
}

public static void clearCache() {
DECODER_CACHES.get().clear();
}

@VisibleForTesting
static ResolvingDecoder resolve(Decoder decoder, Schema readSchema, Schema fileSchema)
throws IOException {
Expand Down
32 changes: 32 additions & 0 deletions kafka-connect/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

project(":iceberg-kafka-connect:iceberg-kafka-connect-events") {
dependencies {
api project(':iceberg-api')
implementation project(':iceberg-core')
implementation project(':iceberg-common')
implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
implementation libs.avro.avro
}

test {
useJUnitPlatform()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.connect.events;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.PartitionData;
import org.apache.iceberg.avro.AvroEncoderUtil;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.data.avro.DecoderResolver;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Types;

/** Class for Avro-related utility methods. */
class AvroUtil {
static final Map<Integer, String> FIELD_ID_TO_CLASS =
ImmutableMap.of(
DataComplete.ASSIGNMENTS_ELEMENT,
TopicPartitionOffset.class.getName(),
DataFile.PARTITION_ID,
PartitionData.class.getName(),
DataWritten.TABLE_REFERENCE,
TableReference.class.getName(),
DataWritten.DATA_FILES_ELEMENT,
"org.apache.iceberg.GenericDataFile",
DataWritten.DELETE_FILES_ELEMENT,
"org.apache.iceberg.GenericDeleteFile",
CommitToTable.TABLE_REFERENCE,
TableReference.class.getName());

public static byte[] encode(Event event) {
try {
return AvroEncoderUtil.encode(event, event.getSchema());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

public static Event decode(byte[] bytes) {
try {
Event event = AvroEncoderUtil.decode(bytes);
// clear the cache to avoid memory leak
DecoderResolver.clearCache();
return event;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

static Schema convert(Types.StructType icebergSchema, Class<? extends IndexedRecord> javaClass) {
return convert(icebergSchema, javaClass, FIELD_ID_TO_CLASS);
}

static Schema convert(
Types.StructType icebergSchema,
Class<? extends IndexedRecord> javaClass,
Map<Integer, String> typeMap) {
return AvroSchemaUtil.convert(
icebergSchema,
(fieldId, struct) ->
struct.equals(icebergSchema) ? javaClass.getName() : typeMap.get(fieldId));
}

static int positionToId(int position, Schema avroSchema) {
List<Schema.Field> fields = avroSchema.getFields();
Preconditions.checkArgument(
position >= 0 && position < fields.size(), "Invalid field position: " + position);
Object val = fields.get(position).getObjectProp(AvroSchemaUtil.FIELD_ID_PROP);
return val == null ? -1 : (int) val;
}

private AvroUtil() {}
}
Loading

0 comments on commit d1a3c10

Please sign in to comment.