Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.util.Map;
import java.util.Objects;

import org.apache.avro.generic.GenericContainer;
import org.apache.avro.generic.GenericData;
import org.apache.avro.util.Utf8;
import org.slf4j.Logger;
Expand Down Expand Up @@ -531,7 +532,11 @@ public Object copyMessage(Object message) {
public Object transformMessage(RuleContext ctx, FieldTransform transform, Object message)
throws RuleException {
try {
return toTransformedMessage(ctx, this.rawSchema(), message, transform);
// Use the schema from the message if it exists, so schema evolution works properly
Schema schema = message instanceof GenericContainer
? ((GenericContainer) message).getSchema()
: this.rawSchema();
return toTransformedMessage(ctx, schema, message, transform);
} catch (RuntimeException e) {
if (e.getCause() instanceof RuleException) {
throw (RuleException) e.getCause();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,15 @@
import io.confluent.kafka.schemaregistry.rules.DlqAction;
import io.confluent.kafka.schemaregistry.rules.PiiProto;
import io.confluent.kafka.schemaregistry.rules.RuleException;
import io.confluent.kafka.schemaregistry.rules.SpecificWidget;
import io.confluent.kafka.schemaregistry.rules.WidgetProto;
import io.confluent.kafka.schemaregistry.rules.WidgetProto.Widget;
import io.confluent.kafka.schemaregistry.rules.WidgetProto2;
import io.confluent.kafka.schemaregistry.rules.WidgetProto2.Widget2;
import io.confluent.kafka.schemaregistry.rules.WidgetWithRefProto.WidgetWithRef;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
import io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer;
Expand Down Expand Up @@ -102,6 +104,8 @@ public class CelExecutorTest {
private final KafkaAvroDeserializer avroDeserializer;
private final KafkaAvroSerializer avroKeySerializer;
private final KafkaAvroDeserializer avroKeyDeserializer;
private final KafkaAvroSerializer specificAvroSerializer;
private final KafkaAvroDeserializer specificAvroDeserializer;
private final KafkaAvroSerializer reflectionAvroSerializer;
private final KafkaAvroDeserializer reflectionAvroDeserializer;
private final KafkaProtobufSerializer<Widget> protobufSerializer;
Expand Down Expand Up @@ -166,6 +170,14 @@ public CelExecutorTest() {
avroKeyDeserializer = new KafkaAvroDeserializer(schemaRegistry);
avroKeyDeserializer.configure(defaultConfig, true);

Map<String, Object> specificSerProps = new HashMap<>(defaultConfig);
specificSerProps.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
specificAvroSerializer = new KafkaAvroSerializer(schemaRegistry, specificSerProps);
Map<String, Object> specificDeserProps = new HashMap<>(defaultConfig);
specificDeserProps.put(AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION, "false");
specificDeserProps.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
specificAvroDeserializer = new KafkaAvroDeserializer(schemaRegistry, specificDeserProps);

Map<String, Object> reflectionProps = new HashMap<>(defaultConfig);
reflectionProps.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REFLECTION_CONFIG, "true");
reflectionAvroSerializer = new KafkaAvroSerializer(schemaRegistry, reflectionProps);
Expand Down Expand Up @@ -774,6 +786,135 @@ public void testKafkaAvroSerializerConstraintDlqDisabled() throws Exception {
verifyNoInteractions(producer2);
}

@Test
public void testKafkaAvroSerializerSpecificFieldTransform() throws Exception {
byte[] bytes;
Object obj;

SpecificWidget widget = new SpecificWidget("alice", 5, 1);
Schema schema = widget.getSchema();
AvroSchema avroSchema = new AvroSchema(schema);
Rule rule = new Rule("myRule", null, RuleKind.TRANSFORM, RuleMode.WRITEREAD,
CelFieldExecutor.TYPE, null, null, "typeName == 'STRING'; value + \"-suffix\"",
null, null, false);
RuleSet ruleSet = new RuleSet(Collections.emptyList(), Collections.singletonList(rule));
avroSchema = avroSchema.copy(null, ruleSet);
schemaRegistry.register(topic + "-value", avroSchema);

bytes = specificAvroSerializer.serialize(topic, widget);
obj = specificAvroDeserializer.deserialize(topic, bytes);
assertTrue(
"Returned object does not match",
SpecificWidget.class.isInstance(obj)
);
assertEquals("alice-suffix-suffix", ((SpecificWidget)obj).getName().toString());
}

@Test
public void testKafkaAvroSerializerSpecificFieldTransformWithMissingProp() throws Exception {
byte[] bytes;
Object obj;

String schemaStr = "{\n"
+ " \"namespace\": \"io.confluent.kafka.schemaregistry.rules\",\n"
+ " \"type\": \"record\",\n"
+ " \"name\": \"SpecificWidget\",\n"
+ " \"fields\": [\n"
+ " {\n"
+ " \"name\": \"name\",\n"
+ " \"type\": \"string\"\n"
+ " },\n"
+ " {\n"
+ " \"name\": \"size\",\n"
+ " \"type\": \"int\"\n"
+ " },\n"
+ " {\n"
+ " \"name\": \"version\",\n"
+ " \"type\": \"int\"\n"
+ " },\n"
+ " {\n"
+ " \"name\": \"missing\",\n"
+ " \"type\": [\"null\", \"string\"],\n"
+ " \"default\": null\n"
+ " }\n"
+ " ]\n"
+ "}";

Schema schema = new Schema.Parser().parse(schemaStr);
AvroSchema avroSchema = new AvroSchema(schema);
Rule rule = new Rule("myRule", null, RuleKind.TRANSFORM, RuleMode.WRITEREAD,
CelFieldExecutor.TYPE, null, null, "typeName == 'STRING'; value + \"-suffix\"",
null, null, false);
RuleSet ruleSet = new RuleSet(Collections.emptyList(), Collections.singletonList(rule));
avroSchema = avroSchema.copy(null, ruleSet);
schemaRegistry.register(topic + "-value", avroSchema);

GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("name", "alice");
avroRecord.put("size", 5);
avroRecord.put("version", 1);
bytes = specificAvroSerializer.serialize(topic, avroRecord);
obj = specificAvroDeserializer.deserialize(topic, bytes);
assertTrue(
"Returned object does not match",
SpecificWidget.class.isInstance(obj)
);
assertEquals("alice-suffix-suffix", ((SpecificWidget)obj).getName().toString());
}

@Test
public void testKafkaAvroSerializerSpecificFieldTransformWithMissingPropReordered()
throws Exception {
byte[] bytes;
Object obj;

String schemaStr = "{\n"
+ " \"namespace\": \"io.confluent.kafka.schemaregistry.rules\",\n"
+ " \"type\": \"record\",\n"
+ " \"name\": \"SpecificWidget\",\n"
+ " \"fields\": [\n"
+ " {\n"
+ " \"name\": \"missing\",\n"
+ " \"type\": [\"null\", \"string\"],\n"
+ " \"default\": null\n"
+ " },\n"
+ " {\n"
+ " \"name\": \"name\",\n"
+ " \"type\": \"string\"\n"
+ " },\n"
+ " {\n"
+ " \"name\": \"size\",\n"
+ " \"type\": \"int\"\n"
+ " },\n"
+ " {\n"
+ " \"name\": \"version\",\n"
+ " \"type\": \"int\"\n"
+ " }\n"
+ " ]\n"
+ "}";

Schema schema = new Schema.Parser().parse(schemaStr);
AvroSchema avroSchema = new AvroSchema(schema);
Rule rule = new Rule("myRule", null, RuleKind.TRANSFORM, RuleMode.WRITEREAD,
CelFieldExecutor.TYPE, null, null, "typeName == 'STRING'; value + \"-suffix\"",
null, null, false);
RuleSet ruleSet = new RuleSet(Collections.emptyList(), Collections.singletonList(rule));
avroSchema = avroSchema.copy(null, ruleSet);
schemaRegistry.register(topic + "-value", avroSchema);

GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("name", "alice");
avroRecord.put("size", 5);
avroRecord.put("version", 1);
bytes = specificAvroSerializer.serialize(topic, avroRecord);
obj = specificAvroDeserializer.deserialize(topic, bytes);
assertTrue(
"Returned object does not match",
SpecificWidget.class.isInstance(obj)
);
assertEquals("alice-suffix-suffix", ((SpecificWidget)obj).getName().toString());
}

@Test
public void testKafkaAvroSerializerReflection() throws Exception {
byte[] bytes;
Expand Down Expand Up @@ -1365,6 +1506,7 @@ public void testKafkaProtobufSerializer() throws Exception {
((DynamicMessage)obj).getField(dynamicDesc.findFieldByName("name"))
);
}

@Test
public void testKafkaProtobuf2Serializer() throws Exception {
byte[] bytes;
Expand Down