Skip to content

Commit dd64cab

Browse files
authored
DGS-22077 Ensure tags are used during schema evolution (#3890) (#3892)
1 parent cfc5e94 commit dd64cab

File tree

2 files changed

+62
-9
lines changed

2 files changed

+62
-9
lines changed

client/src/main/java/io/confluent/kafka/schemaregistry/avro/AvroSchema.java

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -561,10 +561,7 @@ public Object copyMessage(Object message) {
561561
public Object transformMessage(RuleContext ctx, FieldTransform transform, Object message)
562562
throws RuleException {
563563
try {
564-
// Use the schema from the message if it exists, so schema evolution works properly
565-
Schema schema = message instanceof GenericContainer
566-
? ((GenericContainer) message).getSchema()
567-
: this.rawSchema();
564+
Schema schema = this.rawSchema();
568565
return toTransformedMessage(ctx, schema, message, transform);
569566
} catch (RuntimeException e) {
570567
if (e.getCause() instanceof RuleException) {
@@ -613,16 +610,27 @@ private Object toTransformedMessage(
613610
if (message == null) {
614611
return null;
615612
}
616-
data = AvroSchemaUtils.getData(schema, message, false, false);
617-
for (Schema.Field f : schema.getFields()) {
618-
String fullName = schema.getFullName() + "." + f.name();
613+
Schema recordSchema = schema;
614+
if (message instanceof GenericContainer) {
615+
// Use the schema from the message if it exists, so schema evolution works properly
616+
recordSchema = ((GenericContainer) message).getSchema();
617+
}
618+
data = AvroSchemaUtils.getData(recordSchema, message, false, false);
619+
for (Schema.Field f : recordSchema.getFields()) {
620+
// The original field has tags needed for inline tag matching
621+
Schema.Field originalField = schema.getField(f.name());
622+
if (originalField == null) {
623+
originalField = f;
624+
}
625+
String fullName = recordSchema.getFullName() + "." + f.name();
619626
try (FieldContext fc = ctx.enterField(
620-
message, fullName, f.name(), getType(f.schema()), getInlineTags(f))) {
627+
message, fullName, f.name(),
628+
getType(originalField.schema()), getInlineTags(originalField))) {
621629
Object value = data.getField(message, f.name(), f.pos());
622630
if (value instanceof Utf8) {
623631
value = value.toString();
624632
}
625-
Object newValue = toTransformedMessage(ctx, f.schema(), value, transform);
633+
Object newValue = toTransformedMessage(ctx, originalField.schema(), value, transform);
626634
if (ctx.rule().getKind() == RuleKind.CONDITION) {
627635
if (Boolean.FALSE.equals(newValue)) {
628636
throw new RuntimeException(new RuleConditionException(ctx.rule()));

schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutorTest.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -810,6 +810,51 @@ public void testKafkaAvroSerializerSpecificFieldTransform() throws Exception {
810810
assertEquals("alice-suffix-suffix", ((SpecificWidget)obj).getName().toString());
811811
}
812812

813+
@Test
814+
public void testKafkaAvroSerializerSpecificFieldTransformWithTag() throws Exception {
815+
byte[] bytes;
816+
Object obj;
817+
818+
String schemaStr = "{\n"
819+
+ " \"namespace\": \"io.confluent.kafka.schemaregistry.rules\",\n"
820+
+ " \"type\": \"record\",\n"
821+
+ " \"name\": \"SpecificWidget\",\n"
822+
+ " \"fields\": [\n"
823+
+ " {\n"
824+
+ " \"name\": \"name\",\n"
825+
+ " \"type\": \"string\",\n"
826+
+ " \"confluent:tags\": [ \"PII\" ]\n"
827+
+ " },\n"
828+
+ " {\n"
829+
+ " \"name\": \"size\",\n"
830+
+ " \"type\": \"int\"\n"
831+
+ " },\n"
832+
+ " {\n"
833+
+ " \"name\": \"version\",\n"
834+
+ " \"type\": \"int\"\n"
835+
+ " }\n"
836+
+ " ]\n"
837+
+ "}";
838+
839+
Schema schema = new Schema.Parser().parse(schemaStr);
840+
AvroSchema avroSchema = new AvroSchema(schema);
841+
Rule rule = new Rule("myRule", null, RuleKind.TRANSFORM, RuleMode.WRITEREAD,
842+
CelFieldExecutor.TYPE, Collections.singleton("PII"), null, "value + \"-suffix\"",
843+
null, null, false);
844+
RuleSet ruleSet = new RuleSet(Collections.emptyList(), Collections.singletonList(rule));
845+
avroSchema = avroSchema.copy(null, ruleSet);
846+
schemaRegistry.register(topic + "-value", avroSchema);
847+
848+
SpecificWidget widget = new SpecificWidget("alice", 5, 1);
849+
bytes = specificAvroSerializer.serialize(topic, widget);
850+
obj = specificAvroDeserializer.deserialize(topic, bytes);
851+
assertTrue(
852+
"Returned object does not match",
853+
SpecificWidget.class.isInstance(obj)
854+
);
855+
assertEquals("alice-suffix-suffix", ((SpecificWidget)obj).getName().toString());
856+
}
857+
813858
@Test
814859
public void testKafkaAvroSerializerSpecificFieldTransformWithMissingProp() throws Exception {
815860
byte[] bytes;

0 commit comments

Comments
 (0)