|
59 | 59 | import io.confluent.kafka.schemaregistry.rules.DlqAction; |
60 | 60 | import io.confluent.kafka.schemaregistry.rules.PiiProto; |
61 | 61 | import io.confluent.kafka.schemaregistry.rules.RuleException; |
| 62 | +import io.confluent.kafka.schemaregistry.rules.SpecificWidget; |
62 | 63 | import io.confluent.kafka.schemaregistry.rules.WidgetProto; |
63 | 64 | import io.confluent.kafka.schemaregistry.rules.WidgetProto.Widget; |
64 | 65 | import io.confluent.kafka.schemaregistry.rules.WidgetProto2; |
65 | 66 | import io.confluent.kafka.schemaregistry.rules.WidgetProto2.Widget2; |
66 | 67 | import io.confluent.kafka.schemaregistry.rules.WidgetWithRefProto.WidgetWithRef; |
67 | 68 | import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; |
68 | 69 | import io.confluent.kafka.serializers.KafkaAvroDeserializer; |
| 70 | +import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; |
69 | 71 | import io.confluent.kafka.serializers.KafkaAvroSerializer; |
70 | 72 | import io.confluent.kafka.serializers.KafkaAvroSerializerConfig; |
71 | 73 | import io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer; |
@@ -102,6 +104,8 @@ public class CelExecutorTest { |
102 | 104 | private final KafkaAvroDeserializer avroDeserializer; |
103 | 105 | private final KafkaAvroSerializer avroKeySerializer; |
104 | 106 | private final KafkaAvroDeserializer avroKeyDeserializer; |
| 107 | + private final KafkaAvroSerializer specificAvroSerializer; |
| 108 | + private final KafkaAvroDeserializer specificAvroDeserializer; |
105 | 109 | private final KafkaAvroSerializer reflectionAvroSerializer; |
106 | 110 | private final KafkaAvroDeserializer reflectionAvroDeserializer; |
107 | 111 | private final KafkaProtobufSerializer<Widget> protobufSerializer; |
@@ -166,6 +170,14 @@ public CelExecutorTest() { |
166 | 170 | avroKeyDeserializer = new KafkaAvroDeserializer(schemaRegistry); |
167 | 171 | avroKeyDeserializer.configure(defaultConfig, true); |
168 | 172 |
|
| 173 | + Map<String, Object> specificSerProps = new HashMap<>(defaultConfig); |
| 174 | + specificSerProps.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true"); |
| 175 | + specificAvroSerializer = new KafkaAvroSerializer(schemaRegistry, specificSerProps); |
| 176 | + Map<String, Object> specificDeserProps = new HashMap<>(defaultConfig); |
| 177 | + specificDeserProps.put(AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION, "false"); |
| 178 | + specificDeserProps.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true"); |
| 179 | + specificAvroDeserializer = new KafkaAvroDeserializer(schemaRegistry, specificDeserProps); |
| 180 | + |
169 | 181 | Map<String, Object> reflectionProps = new HashMap<>(defaultConfig); |
170 | 182 | reflectionProps.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REFLECTION_CONFIG, "true"); |
171 | 183 | reflectionAvroSerializer = new KafkaAvroSerializer(schemaRegistry, reflectionProps); |
@@ -774,6 +786,135 @@ public void testKafkaAvroSerializerConstraintDlqDisabled() throws Exception { |
774 | 786 | verifyNoInteractions(producer2); |
775 | 787 | } |
776 | 788 |
|
| 789 | + @Test |
| 790 | + public void testKafkaAvroSerializerSpecificFieldTransform() throws Exception { |
| 791 | + byte[] bytes; |
| 792 | + Object obj; |
| 793 | + |
| 794 | + SpecificWidget widget = new SpecificWidget("alice", 5, 1); |
| 795 | + Schema schema = widget.getSchema(); |
| 796 | + AvroSchema avroSchema = new AvroSchema(schema); |
| 797 | + Rule rule = new Rule("myRule", null, RuleKind.TRANSFORM, RuleMode.WRITEREAD, |
| 798 | + CelFieldExecutor.TYPE, null, null, "typeName == 'STRING'; value + \"-suffix\"", |
| 799 | + null, null, false); |
| 800 | + RuleSet ruleSet = new RuleSet(Collections.emptyList(), Collections.singletonList(rule)); |
| 801 | + avroSchema = avroSchema.copy(null, ruleSet); |
| 802 | + schemaRegistry.register(topic + "-value", avroSchema); |
| 803 | + |
| 804 | + bytes = specificAvroSerializer.serialize(topic, widget); |
| 805 | + obj = specificAvroDeserializer.deserialize(topic, bytes); |
| 806 | + assertTrue( |
| 807 | + "Returned object does not match", |
| 808 | + SpecificWidget.class.isInstance(obj) |
| 809 | + ); |
| 810 | + assertEquals("alice-suffix-suffix", ((SpecificWidget)obj).getName().toString()); |
| 811 | + } |
| 812 | + |
| 813 | + @Test |
| 814 | + public void testKafkaAvroSerializerSpecificFieldTransformWithMissingProp() throws Exception { |
| 815 | + byte[] bytes; |
| 816 | + Object obj; |
| 817 | + |
| 818 | + String schemaStr = "{\n" |
| 819 | + + " \"namespace\": \"io.confluent.kafka.schemaregistry.rules\",\n" |
| 820 | + + " \"type\": \"record\",\n" |
| 821 | + + " \"name\": \"SpecificWidget\",\n" |
| 822 | + + " \"fields\": [\n" |
| 823 | + + " {\n" |
| 824 | + + " \"name\": \"name\",\n" |
| 825 | + + " \"type\": \"string\"\n" |
| 826 | + + " },\n" |
| 827 | + + " {\n" |
| 828 | + + " \"name\": \"size\",\n" |
| 829 | + + " \"type\": \"int\"\n" |
| 830 | + + " },\n" |
| 831 | + + " {\n" |
| 832 | + + " \"name\": \"version\",\n" |
| 833 | + + " \"type\": \"int\"\n" |
| 834 | + + " },\n" |
| 835 | + + " {\n" |
| 836 | + + " \"name\": \"missing\",\n" |
| 837 | + + " \"type\": [\"null\", \"string\"],\n" |
| 838 | + + " \"default\": null\n" |
| 839 | + + " }\n" |
| 840 | + + " ]\n" |
| 841 | + + "}"; |
| 842 | + |
| 843 | + Schema schema = new Schema.Parser().parse(schemaStr); |
| 844 | + AvroSchema avroSchema = new AvroSchema(schema); |
| 845 | + Rule rule = new Rule("myRule", null, RuleKind.TRANSFORM, RuleMode.WRITEREAD, |
| 846 | + CelFieldExecutor.TYPE, null, null, "typeName == 'STRING'; value + \"-suffix\"", |
| 847 | + null, null, false); |
| 848 | + RuleSet ruleSet = new RuleSet(Collections.emptyList(), Collections.singletonList(rule)); |
| 849 | + avroSchema = avroSchema.copy(null, ruleSet); |
| 850 | + schemaRegistry.register(topic + "-value", avroSchema); |
| 851 | + |
| 852 | + GenericRecord avroRecord = new GenericData.Record(schema); |
| 853 | + avroRecord.put("name", "alice"); |
| 854 | + avroRecord.put("size", 5); |
| 855 | + avroRecord.put("version", 1); |
| 856 | + bytes = specificAvroSerializer.serialize(topic, avroRecord); |
| 857 | + obj = specificAvroDeserializer.deserialize(topic, bytes); |
| 858 | + assertTrue( |
| 859 | + "Returned object does not match", |
| 860 | + SpecificWidget.class.isInstance(obj) |
| 861 | + ); |
| 862 | + assertEquals("alice-suffix-suffix", ((SpecificWidget)obj).getName().toString()); |
| 863 | + } |
| 864 | + |
| 865 | + @Test |
| 866 | + public void testKafkaAvroSerializerSpecificFieldTransformWithMissingPropReordered() |
| 867 | + throws Exception { |
| 868 | + byte[] bytes; |
| 869 | + Object obj; |
| 870 | + |
| 871 | + String schemaStr = "{\n" |
| 872 | + + " \"namespace\": \"io.confluent.kafka.schemaregistry.rules\",\n" |
| 873 | + + " \"type\": \"record\",\n" |
| 874 | + + " \"name\": \"SpecificWidget\",\n" |
| 875 | + + " \"fields\": [\n" |
| 876 | + + " {\n" |
| 877 | + + " \"name\": \"missing\",\n" |
| 878 | + + " \"type\": [\"null\", \"string\"],\n" |
| 879 | + + " \"default\": null\n" |
| 880 | + + " },\n" |
| 881 | + + " {\n" |
| 882 | + + " \"name\": \"name\",\n" |
| 883 | + + " \"type\": \"string\"\n" |
| 884 | + + " },\n" |
| 885 | + + " {\n" |
| 886 | + + " \"name\": \"size\",\n" |
| 887 | + + " \"type\": \"int\"\n" |
| 888 | + + " },\n" |
| 889 | + + " {\n" |
| 890 | + + " \"name\": \"version\",\n" |
| 891 | + + " \"type\": \"int\"\n" |
| 892 | + + " }\n" |
| 893 | + + " ]\n" |
| 894 | + + "}"; |
| 895 | + |
| 896 | + Schema schema = new Schema.Parser().parse(schemaStr); |
| 897 | + AvroSchema avroSchema = new AvroSchema(schema); |
| 898 | + Rule rule = new Rule("myRule", null, RuleKind.TRANSFORM, RuleMode.WRITEREAD, |
| 899 | + CelFieldExecutor.TYPE, null, null, "typeName == 'STRING'; value + \"-suffix\"", |
| 900 | + null, null, false); |
| 901 | + RuleSet ruleSet = new RuleSet(Collections.emptyList(), Collections.singletonList(rule)); |
| 902 | + avroSchema = avroSchema.copy(null, ruleSet); |
| 903 | + schemaRegistry.register(topic + "-value", avroSchema); |
| 904 | + |
| 905 | + GenericRecord avroRecord = new GenericData.Record(schema); |
| 906 | + avroRecord.put("name", "alice"); |
| 907 | + avroRecord.put("size", 5); |
| 908 | + avroRecord.put("version", 1); |
| 909 | + bytes = specificAvroSerializer.serialize(topic, avroRecord); |
| 910 | + obj = specificAvroDeserializer.deserialize(topic, bytes); |
| 911 | + assertTrue( |
| 912 | + "Returned object does not match", |
| 913 | + SpecificWidget.class.isInstance(obj) |
| 914 | + ); |
| 915 | + assertEquals("alice-suffix-suffix", ((SpecificWidget)obj).getName().toString()); |
| 916 | + } |
| 917 | + |
777 | 918 | @Test |
778 | 919 | public void testKafkaAvroSerializerReflection() throws Exception { |
779 | 920 | byte[] bytes; |
@@ -1365,6 +1506,7 @@ public void testKafkaProtobufSerializer() throws Exception { |
1365 | 1506 | ((DynamicMessage)obj).getField(dynamicDesc.findFieldByName("name")) |
1366 | 1507 | ); |
1367 | 1508 | } |
| 1509 | + |
1368 | 1510 | @Test |
1369 | 1511 | public void testKafkaProtobuf2Serializer() throws Exception { |
1370 | 1512 | byte[] bytes; |
|
0 commit comments