Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -532,10 +532,7 @@
public Object transformMessage(RuleContext ctx, FieldTransform transform, Object message)
throws RuleException {
try {
// Use the schema from the message if it exists, so schema evolution works properly
Schema schema = message instanceof GenericContainer
? ((GenericContainer) message).getSchema()
: this.rawSchema();
Schema schema = this.rawSchema();
return toTransformedMessage(ctx, schema, message, transform);
} catch (RuntimeException e) {
if (e.getCause() instanceof RuleException) {
Expand All @@ -546,7 +543,7 @@
}
}

private Object toTransformedMessage(

Check failure on line 546 in client/src/main/java/io/confluent/kafka/schemaregistry/avro/AvroSchema.java

View check run for this annotation

SonarQube-Confluent / schema-registry Sonarqube Results

client/src/main/java/io/confluent/kafka/schemaregistry/avro/AvroSchema.java#L546

Refactor this method to reduce its Cognitive Complexity from 40 to the 15 allowed.
RuleContext ctx, Schema schema, Object message, FieldTransform transform) {
FieldContext fieldCtx = ctx.currentField();
if (schema == null) {
Expand Down Expand Up @@ -584,16 +581,27 @@
if (message == null) {
return message;
}
data = AvroSchemaUtils.getData(schema, message, false, false);
for (Schema.Field f : schema.getFields()) {
String fullName = schema.getFullName() + "." + f.name();
Schema recordSchema = schema;
if (message instanceof GenericContainer) {
// Use the schema from the message if it exists, so schema evolution works properly
recordSchema = ((GenericContainer) message).getSchema();
}
data = AvroSchemaUtils.getData(recordSchema, message, false, false);
for (Schema.Field f : recordSchema.getFields()) {
// The original field has tags needed for inline tag matching
Schema.Field originalField = schema.getField(f.name());
if (originalField == null) {
originalField = f;
}
String fullName = recordSchema.getFullName() + "." + f.name();
try (FieldContext fc = ctx.enterField(
message, fullName, f.name(), getType(f.schema()), getInlineTags(f))) {
message, fullName, f.name(),
getType(originalField.schema()), getInlineTags(originalField))) {
Object value = data.getField(message, f.name(), f.pos());
if (value instanceof Utf8) {
value = value.toString();
}
Object newValue = toTransformedMessage(ctx, f.schema(), value, transform);
Object newValue = toTransformedMessage(ctx, originalField.schema(), value, transform);
if (ctx.rule().getKind() == RuleKind.CONDITION) {
if (Boolean.FALSE.equals(newValue)) {
throw new RuntimeException(new RuleConditionException(ctx.rule()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,13 +425,13 @@
public void testKafkaAvroSerializer() throws Exception {
IndexedRecord avroRecord = createUserRecord();
AvroSchema avroSchema = new AvroSchema(avroRecord.getSchema());
Rule rule = new Rule("myRule", null, RuleKind.CONDITION, RuleMode.WRITEREAD,

Check failure on line 428 in schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutorTest.java

View check run for this annotation

SonarQube-Confluent / schema-registry Sonarqube Results

schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutorTest.java#L428

Define a constant instead of duplicating this literal "myRule" 64 times.
CelExecutor.TYPE, null, null,
"message.name == \"testUser\" && size(message.name) == 8 && message.kind == \"ONE\"",
null, null, false);
RuleSet ruleSet = new RuleSet(Collections.emptyList(), Collections.singletonList(rule));
avroSchema = avroSchema.copy(null, ruleSet);
schemaRegistry.register(topic + "-value", avroSchema);

Check failure on line 434 in schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutorTest.java

View check run for this annotation

SonarQube-Confluent / schema-registry Sonarqube Results

schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutorTest.java#L434

Define a constant instead of duplicating this literal "-value" 64 times.

byte[] bytes = avroSerializer.serialize(topic, avroRecord);
assertEquals(avroRecord, avroDeserializer.deserialize(topic, bytes));
Expand Down Expand Up @@ -791,7 +791,7 @@
byte[] bytes;
Object obj;

SpecificWidget widget = new SpecificWidget("alice", 5, 1);

Check failure on line 794 in schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutorTest.java

View check run for this annotation

SonarQube-Confluent / schema-registry Sonarqube Results

schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutorTest.java#L794

Define a constant instead of duplicating this literal "alice" 44 times.
Schema schema = widget.getSchema();
AvroSchema avroSchema = new AvroSchema(schema);
Rule rule = new Rule("myRule", null, RuleKind.TRANSFORM, RuleMode.WRITEREAD,
Expand All @@ -804,12 +804,57 @@
bytes = specificAvroSerializer.serialize(topic, widget);
obj = specificAvroDeserializer.deserialize(topic, bytes);
assertTrue(
"Returned object does not match",

Check failure on line 807 in schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutorTest.java

View check run for this annotation

SonarQube-Confluent / schema-registry Sonarqube Results

schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutorTest.java#L807

Define a constant instead of duplicating this literal "Returned object does not match" 122 times.
SpecificWidget.class.isInstance(obj)
);
assertEquals("alice-suffix-suffix", ((SpecificWidget)obj).getName().toString());

Check failure on line 810 in schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutorTest.java

View check run for this annotation

SonarQube-Confluent / schema-registry Sonarqube Results

schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutorTest.java#L810

Define a constant instead of duplicating this literal "alice-suffix-suffix" 4 times.
}

@Test
public void testKafkaAvroSerializerSpecificFieldTransformWithTag() throws Exception {
byte[] bytes;
Object obj;

String schemaStr = "{\n"
+ " \"namespace\": \"io.confluent.kafka.schemaregistry.rules\",\n"

Check failure on line 819 in schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutorTest.java

View check run for this annotation

SonarQube-Confluent / schema-registry Sonarqube Results

schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutorTest.java#L819

Define a constant instead of duplicating this literal " \"namespace\": \"io.confluent.kafka.schemaregistry.rules\", " 3 times.
+ " \"type\": \"record\",\n"

Check failure on line 820 in schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutorTest.java

View check run for this annotation

SonarQube-Confluent / schema-registry Sonarqube Results

schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutorTest.java#L820

Define a constant instead of duplicating this literal " \"type\": \"record\", " 3 times.
+ " \"name\": \"SpecificWidget\",\n"

Check failure on line 821 in schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutorTest.java

View check run for this annotation

SonarQube-Confluent / schema-registry Sonarqube Results

schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutorTest.java#L821

Define a constant instead of duplicating this literal " \"name\": \"SpecificWidget\", " 3 times.
+ " \"fields\": [\n"

Check failure on line 822 in schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutorTest.java

View check run for this annotation

SonarQube-Confluent / schema-registry Sonarqube Results

schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutorTest.java#L822

Define a constant instead of duplicating this literal " \"fields\": [ " 3 times.
+ " {\n"

Check failure on line 823 in schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutorTest.java

View check run for this annotation

SonarQube-Confluent / schema-registry Sonarqube Results

schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutorTest.java#L823

Define a constant instead of duplicating this literal " { " 11 times.
+ " \"name\": \"name\",\n"

Check failure on line 824 in schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutorTest.java

View check run for this annotation

SonarQube-Confluent / schema-registry Sonarqube Results

schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutorTest.java#L824

Define a constant instead of duplicating this literal " \"name\": \"name\", " 3 times.
+ " \"type\": \"string\",\n"
+ " \"confluent:tags\": [ \"PII\" ]\n"
+ " },\n"

Check failure on line 827 in schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutorTest.java

View check run for this annotation

SonarQube-Confluent / schema-registry Sonarqube Results

schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutorTest.java#L827

Define a constant instead of duplicating this literal " }, " 9 times.
+ " {\n"
+ " \"name\": \"size\",\n"

Check failure on line 829 in schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutorTest.java

View check run for this annotation

SonarQube-Confluent / schema-registry Sonarqube Results

schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutorTest.java#L829

Define a constant instead of duplicating this literal " \"name\": \"size\", " 3 times.
+ " \"type\": \"int\"\n"

Check failure on line 830 in schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutorTest.java

View check run for this annotation

SonarQube-Confluent / schema-registry Sonarqube Results

schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutorTest.java#L830

Define a constant instead of duplicating this literal " \"type\": \"int\" " 6 times.
+ " },\n"
+ " {\n"
+ " \"name\": \"version\",\n"

Check failure on line 833 in schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutorTest.java

View check run for this annotation

SonarQube-Confluent / schema-registry Sonarqube Results

schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutorTest.java#L833

Define a constant instead of duplicating this literal " \"name\": \"version\", " 3 times.
+ " \"type\": \"int\"\n"
+ " }\n"

Check failure on line 835 in schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutorTest.java

View check run for this annotation

SonarQube-Confluent / schema-registry Sonarqube Results

schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutorTest.java#L835

Define a constant instead of duplicating this literal " } " 5 times.
+ " ]\n"

Check failure on line 836 in schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutorTest.java

View check run for this annotation

SonarQube-Confluent / schema-registry Sonarqube Results

schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutorTest.java#L836

Define a constant instead of duplicating this literal " ] " 3 times.
+ "}";

Schema schema = new Schema.Parser().parse(schemaStr);
AvroSchema avroSchema = new AvroSchema(schema);
Rule rule = new Rule("myRule", null, RuleKind.TRANSFORM, RuleMode.WRITEREAD,
CelFieldExecutor.TYPE, Collections.singleton("PII"), null, "value + \"-suffix\"",

Check failure on line 842 in schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutorTest.java

View check run for this annotation

SonarQube-Confluent / schema-registry Sonarqube Results

schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutorTest.java#L842

Define a constant instead of duplicating this literal "value + \"-suffix\"" 14 times.
null, null, false);
RuleSet ruleSet = new RuleSet(Collections.emptyList(), Collections.singletonList(rule));
avroSchema = avroSchema.copy(null, ruleSet);
schemaRegistry.register(topic + "-value", avroSchema);

SpecificWidget widget = new SpecificWidget("alice", 5, 1);
bytes = specificAvroSerializer.serialize(topic, widget);
obj = specificAvroDeserializer.deserialize(topic, bytes);
assertTrue(
"Returned object does not match",
SpecificWidget.class.isInstance(obj)
);
assertEquals("alice-suffix-suffix", ((SpecificWidget)obj).getName().toString());
}

@Test
public void testKafkaAvroSerializerSpecificFieldTransformWithMissingProp() throws Exception {
byte[] bytes;
Expand Down