diff --git a/src/main/java/org/akhq/utils/AvroSerializer.java b/src/main/java/org/akhq/utils/AvroSerializer.java index 3399564d6..9304b2295 100644 --- a/src/main/java/org/akhq/utils/AvroSerializer.java +++ b/src/main/java/org/akhq/utils/AvroSerializer.java @@ -8,7 +8,6 @@ import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.util.Utf8; -import org.apache.commons.collections.CollectionUtils; import java.math.BigDecimal; import java.math.MathContext; @@ -55,26 +54,53 @@ public class AvroSerializer { .toFormatter(); public static GenericRecord recordSerializer(Map record, Schema schema) { - GenericRecord returnValue = new GenericData.Record(schema); - Set schemaFields = schema.getFields().stream() - .map(Schema.Field::name).collect(Collectors.toSet()); - - Set recordFields = record.keySet(); - - if (schemaFields.size() != recordFields.size()) { - Object[] missingFields = CollectionUtils.disjunction(schemaFields, recordFields).stream().toArray(); - throw new IllegalArgumentException(" Record does not contain followings fields ".concat(Arrays.toString(missingFields))); - } + validateSchema(schema.getFields(), record); + GenericRecord returnValue = new GenericData.Record(schema); schema .getFields() .forEach(field -> { Object fieldValue = record.getOrDefault(field.name(), field.defaultVal()); returnValue.put(field.name(), AvroSerializer.objectSerializer(fieldValue, field.schema())); }); + return returnValue; } + private static void validateSchema(List fields, Map record) { + for (Schema.Field field : fields) { + var schema = field.schema(); + var type = schema.getType(); + var value = Optional.ofNullable(record) + .filter(Objects::nonNull) + .map(r -> r.get(field.name())); + var hasEmptyValue = value.isEmpty(); + + validateSchemaHasDefaultValue(field, schema, hasEmptyValue); + + if (Schema.Type.RECORD.getName().equals(type.getName()) && !hasEmptyValue) { + validateSchema(schema.getFields(), (Map) value.get()); + } + else if (Schema.Type.ARRAY.getName().equals(type.getName()) && !hasEmptyValue) { + Schema elementType = schema.getElementType(); + if (elementType.getType().equals(Schema.Type.RECORD)) { + for(Map val : (List>) value.get()) { + validateSchema(elementType.getFields(), val); + } + } + } + } + } + + private static void validateSchemaHasDefaultValue(Schema.Field field, Schema schema, boolean hasEmptyValue) { + var isFieldHasNullValue = field.hasDefaultValue() || schema.isNullable(); + + if ((!isFieldHasNullValue) && hasEmptyValue) { + var message = String.format("Field %s is missing in the payload", field.name()); + throw new IllegalArgumentException(message); + } + } + @SuppressWarnings("unchecked") private static Object objectSerializer(Object value, Schema schema) { if (value == org.apache.avro.JsonProperties.NULL_VALUE) { diff --git a/src/test/java/org/akhq/modules/AvroSchemaSerializerTest.java b/src/test/java/org/akhq/modules/AvroSchemaSerializerTest.java index bb63be36e..f3e9c6ca4 100644 --- a/src/test/java/org/akhq/modules/AvroSchemaSerializerTest.java +++ b/src/test/java/org/akhq/modules/AvroSchemaSerializerTest.java @@ -3,6 +3,8 @@ import io.confluent.kafka.schemaregistry.avro.AvroSchema; import org.akhq.configs.SchemaRegistryType; import org.akhq.modules.schemaregistry.AvroSerializer; + +import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -11,8 +13,7 @@ import java.nio.ByteBuffer; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.*; @ExtendWith(MockitoExtension.class) class AvroSchemaSerializerTest { @@ -26,6 +27,60 @@ class AvroSchemaSerializerTest { .name("rating").type().doubleType().noDefault() .endRecord(); + private final org.apache.avro.Schema NESTED_SCHEMA = + new Schema.Parser().parse("{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"userInfo\",\n" + + " \"namespace\": \"org.akhq\",\n" + + " \"fields\": [\n" + + " {\n" + + " \"name\": \"username\",\n" + + " \"type\": \"string\",\n" + + " \"default\": \"NONE\"\n" + + " },\n" + + " {\n" + + " \"name\": \"age\",\n" + + " \"type\": \"int\",\n" + + " \"default\": -1\n" + + " },\n" + + " {\n" + + " \"name\": \"phone\",\n" + + " \"type\": \"string\"\n" + + " },\n" + + " {\n" + + " \"name\": \"address\",\n" + + " \"type\": {\n" + + " \"type\": \"record\",\n" + + " \"name\": \"mailing_address\",\n" + + " \"fields\": [\n" + + " {\n" + + " \"name\": \"street\",\n" + + " \"type\": \"string\"\n" + + " },\n" + + " {\n" + + " \"name\": \"detailaddress\",\n" + + " \"type\": {\n" + + " \"type\": \"record\",\n" + + " \"name\": \"homeaddress\",\n" + + " \"fields\": [\n" + + " {\n" + + " \"name\": \"houseNo\",\n" + + " \"type\": \"int\",\n" + + " \"default\": 1\n" + + " },\n" + + " {\n" + + " \"name\": \"roomNo\",\n" + + " \"type\": \"int\"\n" + + " }\n" + + " ]\n" + + " }\n" + + " }\n" + + " ]\n" + + " }\n" + + " }\n" + + " ]\n" + + "}"); + public static final String VALID_JSON = "{\n" + " \"title\": \"the-title\",\n" + " \"release_year\": 123,\n" + @@ -38,11 +93,34 @@ class AvroSchemaSerializerTest { " \"rating\": 2.5\n" + "}"; + public static final String INVALID_NESTED_JSON = "{\n" + + " \"phone\": \"12345\",\n" + + " \"address\": {\n" + + " \"street\": \"Test Street\",\n" + + " \"detailaddress\" : {\n" + + " \n" + + " }\n" + + " }\n" + + "}"; + + public static final String VALID_NESTED_JSON = "{\n" + + " \"phone\": \"2312331\",\n" + + " \"address\": {\n" + + " \"street\": \"Test Street\",\n" + + " \"detailaddress\" : {\n" + + " \"houseNo\" : 1,\n" + + " \"roomNo\" : 2\n" + + " }\n" + + " }\n" + + "}"; + private AvroSerializer avroSerializer; + private AvroSerializer avroDeepSerializer; @BeforeEach void setUp() { avroSerializer = AvroSerializer.newInstance(SCHEMA_ID, new AvroSchema(SCHEMA), SchemaRegistryType.CONFLUENT); + avroDeepSerializer = AvroSerializer.newInstance(SCHEMA_ID, new AvroSchema(NESTED_SCHEMA), SchemaRegistryType.CONFLUENT); } @Test @@ -59,9 +137,16 @@ void shouldSerializeSchemaId() { @Test void shouldFailIfDoesntMatchSchemaId() { - assertThrows(NullPointerException.class, () -> { - int schemaId = 3; - avroSerializer.serialize(INVALID_JSON); - }); + assertThrows(IllegalArgumentException.class, () -> avroSerializer.serialize(INVALID_JSON)); + } + + @Test + void shouldThrowForDeepNestedInvalidJSON() { + assertThrows(IllegalArgumentException.class, () -> avroDeepSerializer.serialize(INVALID_NESTED_JSON)); + } + + @Test + void shouldNotThrowForValidNestedJSON() { + assertDoesNotThrow(() -> avroDeepSerializer.serialize(VALID_NESTED_JSON)); } }