Skip to content

Commit eba5f16

Browse files
Merge branch '7.6.x' into 7.7.x by rayokota
2 parents 609aedd + 58a29ef commit eba5f16

File tree

2 files changed

+148
-1
lines changed

2 files changed

+148
-1
lines changed

client/src/main/java/io/confluent/kafka/schemaregistry/avro/AvroSchema.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import java.util.stream.StreamSupport;
5858
import org.apache.avro.Schema;
5959
import org.apache.avro.SchemaCompatibility;
60+
import org.apache.avro.generic.GenericContainer;
6061
import org.apache.avro.generic.GenericData;
6162
import org.apache.avro.util.Utf8;
6263
import org.slf4j.Logger;
@@ -559,7 +560,11 @@ public Object copyMessage(Object message) {
559560
public Object transformMessage(RuleContext ctx, FieldTransform transform, Object message)
560561
throws RuleException {
561562
try {
562-
return toTransformedMessage(ctx, this.rawSchema(), message, transform);
563+
// Use the schema from the message if it exists, so schema evolution works properly
564+
Schema schema = message instanceof GenericContainer
565+
? ((GenericContainer) message).getSchema()
566+
: this.rawSchema();
567+
return toTransformedMessage(ctx, schema, message, transform);
563568
} catch (RuntimeException e) {
564569
if (e.getCause() instanceof RuleException) {
565570
throw (RuleException) e.getCause();

schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutorTest.java

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,13 +59,15 @@
5959
import io.confluent.kafka.schemaregistry.rules.DlqAction;
6060
import io.confluent.kafka.schemaregistry.rules.PiiProto;
6161
import io.confluent.kafka.schemaregistry.rules.RuleException;
62+
import io.confluent.kafka.schemaregistry.rules.SpecificWidget;
6263
import io.confluent.kafka.schemaregistry.rules.WidgetProto;
6364
import io.confluent.kafka.schemaregistry.rules.WidgetProto.Widget;
6465
import io.confluent.kafka.schemaregistry.rules.WidgetProto2;
6566
import io.confluent.kafka.schemaregistry.rules.WidgetProto2.Widget2;
6667
import io.confluent.kafka.schemaregistry.rules.WidgetWithRefProto.WidgetWithRef;
6768
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
6869
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
70+
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
6971
import io.confluent.kafka.serializers.KafkaAvroSerializer;
7072
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
7173
import io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer;
@@ -102,6 +104,8 @@ public class CelExecutorTest {
102104
private final KafkaAvroDeserializer avroDeserializer;
103105
private final KafkaAvroSerializer avroKeySerializer;
104106
private final KafkaAvroDeserializer avroKeyDeserializer;
107+
private final KafkaAvroSerializer specificAvroSerializer;
108+
private final KafkaAvroDeserializer specificAvroDeserializer;
105109
private final KafkaAvroSerializer reflectionAvroSerializer;
106110
private final KafkaAvroDeserializer reflectionAvroDeserializer;
107111
private final KafkaProtobufSerializer<Widget> protobufSerializer;
@@ -166,6 +170,14 @@ public CelExecutorTest() {
166170
avroKeyDeserializer = new KafkaAvroDeserializer(schemaRegistry);
167171
avroKeyDeserializer.configure(defaultConfig, true);
168172

173+
Map<String, Object> specificSerProps = new HashMap<>(defaultConfig);
174+
specificSerProps.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
175+
specificAvroSerializer = new KafkaAvroSerializer(schemaRegistry, specificSerProps);
176+
Map<String, Object> specificDeserProps = new HashMap<>(defaultConfig);
177+
specificDeserProps.put(AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION, "false");
178+
specificDeserProps.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
179+
specificAvroDeserializer = new KafkaAvroDeserializer(schemaRegistry, specificDeserProps);
180+
169181
Map<String, Object> reflectionProps = new HashMap<>(defaultConfig);
170182
reflectionProps.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REFLECTION_CONFIG, "true");
171183
reflectionAvroSerializer = new KafkaAvroSerializer(schemaRegistry, reflectionProps);
@@ -774,6 +786,135 @@ public void testKafkaAvroSerializerConstraintDlqDisabled() throws Exception {
774786
verifyNoInteractions(producer2);
775787
}
776788

789+
@Test
790+
public void testKafkaAvroSerializerSpecificFieldTransform() throws Exception {
791+
byte[] bytes;
792+
Object obj;
793+
794+
SpecificWidget widget = new SpecificWidget("alice", 5, 1);
795+
Schema schema = widget.getSchema();
796+
AvroSchema avroSchema = new AvroSchema(schema);
797+
Rule rule = new Rule("myRule", null, RuleKind.TRANSFORM, RuleMode.WRITEREAD,
798+
CelFieldExecutor.TYPE, null, null, "typeName == 'STRING'; value + \"-suffix\"",
799+
null, null, false);
800+
RuleSet ruleSet = new RuleSet(Collections.emptyList(), Collections.singletonList(rule));
801+
avroSchema = avroSchema.copy(null, ruleSet);
802+
schemaRegistry.register(topic + "-value", avroSchema);
803+
804+
bytes = specificAvroSerializer.serialize(topic, widget);
805+
obj = specificAvroDeserializer.deserialize(topic, bytes);
806+
assertTrue(
807+
"Returned object does not match",
808+
SpecificWidget.class.isInstance(obj)
809+
);
810+
assertEquals("alice-suffix-suffix", ((SpecificWidget)obj).getName().toString());
811+
}
812+
813+
@Test
814+
public void testKafkaAvroSerializerSpecificFieldTransformWithMissingProp() throws Exception {
815+
byte[] bytes;
816+
Object obj;
817+
818+
String schemaStr = "{\n"
819+
+ " \"namespace\": \"io.confluent.kafka.schemaregistry.rules\",\n"
820+
+ " \"type\": \"record\",\n"
821+
+ " \"name\": \"SpecificWidget\",\n"
822+
+ " \"fields\": [\n"
823+
+ " {\n"
824+
+ " \"name\": \"name\",\n"
825+
+ " \"type\": \"string\"\n"
826+
+ " },\n"
827+
+ " {\n"
828+
+ " \"name\": \"size\",\n"
829+
+ " \"type\": \"int\"\n"
830+
+ " },\n"
831+
+ " {\n"
832+
+ " \"name\": \"version\",\n"
833+
+ " \"type\": \"int\"\n"
834+
+ " },\n"
835+
+ " {\n"
836+
+ " \"name\": \"missing\",\n"
837+
+ " \"type\": [\"null\", \"string\"],\n"
838+
+ " \"default\": null\n"
839+
+ " }\n"
840+
+ " ]\n"
841+
+ "}";
842+
843+
Schema schema = new Schema.Parser().parse(schemaStr);
844+
AvroSchema avroSchema = new AvroSchema(schema);
845+
Rule rule = new Rule("myRule", null, RuleKind.TRANSFORM, RuleMode.WRITEREAD,
846+
CelFieldExecutor.TYPE, null, null, "typeName == 'STRING'; value + \"-suffix\"",
847+
null, null, false);
848+
RuleSet ruleSet = new RuleSet(Collections.emptyList(), Collections.singletonList(rule));
849+
avroSchema = avroSchema.copy(null, ruleSet);
850+
schemaRegistry.register(topic + "-value", avroSchema);
851+
852+
GenericRecord avroRecord = new GenericData.Record(schema);
853+
avroRecord.put("name", "alice");
854+
avroRecord.put("size", 5);
855+
avroRecord.put("version", 1);
856+
bytes = specificAvroSerializer.serialize(topic, avroRecord);
857+
obj = specificAvroDeserializer.deserialize(topic, bytes);
858+
assertTrue(
859+
"Returned object does not match",
860+
SpecificWidget.class.isInstance(obj)
861+
);
862+
assertEquals("alice-suffix-suffix", ((SpecificWidget)obj).getName().toString());
863+
}
864+
865+
@Test
866+
public void testKafkaAvroSerializerSpecificFieldTransformWithMissingPropReordered()
867+
throws Exception {
868+
byte[] bytes;
869+
Object obj;
870+
871+
String schemaStr = "{\n"
872+
+ " \"namespace\": \"io.confluent.kafka.schemaregistry.rules\",\n"
873+
+ " \"type\": \"record\",\n"
874+
+ " \"name\": \"SpecificWidget\",\n"
875+
+ " \"fields\": [\n"
876+
+ " {\n"
877+
+ " \"name\": \"missing\",\n"
878+
+ " \"type\": [\"null\", \"string\"],\n"
879+
+ " \"default\": null\n"
880+
+ " },\n"
881+
+ " {\n"
882+
+ " \"name\": \"name\",\n"
883+
+ " \"type\": \"string\"\n"
884+
+ " },\n"
885+
+ " {\n"
886+
+ " \"name\": \"size\",\n"
887+
+ " \"type\": \"int\"\n"
888+
+ " },\n"
889+
+ " {\n"
890+
+ " \"name\": \"version\",\n"
891+
+ " \"type\": \"int\"\n"
892+
+ " }\n"
893+
+ " ]\n"
894+
+ "}";
895+
896+
Schema schema = new Schema.Parser().parse(schemaStr);
897+
AvroSchema avroSchema = new AvroSchema(schema);
898+
Rule rule = new Rule("myRule", null, RuleKind.TRANSFORM, RuleMode.WRITEREAD,
899+
CelFieldExecutor.TYPE, null, null, "typeName == 'STRING'; value + \"-suffix\"",
900+
null, null, false);
901+
RuleSet ruleSet = new RuleSet(Collections.emptyList(), Collections.singletonList(rule));
902+
avroSchema = avroSchema.copy(null, ruleSet);
903+
schemaRegistry.register(topic + "-value", avroSchema);
904+
905+
GenericRecord avroRecord = new GenericData.Record(schema);
906+
avroRecord.put("name", "alice");
907+
avroRecord.put("size", 5);
908+
avroRecord.put("version", 1);
909+
bytes = specificAvroSerializer.serialize(topic, avroRecord);
910+
obj = specificAvroDeserializer.deserialize(topic, bytes);
911+
assertTrue(
912+
"Returned object does not match",
913+
SpecificWidget.class.isInstance(obj)
914+
);
915+
assertEquals("alice-suffix-suffix", ((SpecificWidget)obj).getName().toString());
916+
}
917+
777918
@Test
778919
public void testKafkaAvroSerializerReflection() throws Exception {
779920
byte[] bytes;
@@ -1365,6 +1506,7 @@ public void testKafkaProtobufSerializer() throws Exception {
13651506
((DynamicMessage)obj).getField(dynamicDesc.findFieldByName("name"))
13661507
);
13671508
}
1509+
13681510
@Test
13691511
public void testKafkaProtobuf2Serializer() throws Exception {
13701512
byte[] bytes;

0 commit comments

Comments
 (0)