Skip to content

Commit

Permalink
Consolidate serializer logic (#24239)
Browse files Browse the repository at this point in the history
* Move logic from AvroSchemaUtils into Serializer. Remove AvroSchemaUtils.

* Delete AvroSchemaUtilsTest and move tests into Serializer.

* Adding tests for getting Schema.

* Rename class to AvroSerializer.

* Aggregating logic into serializer.

* Adding documentation.

* Fixing checkstyle suppression.

* Fix checkstyle.

* Removing assert, runtime exceptions, and adding log.

Co-authored-by: Srikanta <51379715+srnagar@users.noreply.github.com>
  • Loading branch information
conniey and srnagar authored Sep 22, 2021
1 parent 945e314 commit c7cc461
Show file tree
Hide file tree
Showing 8 changed files with 323 additions and 325 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,66 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

/**
* Base Codec class for Avro encoder and decoder implementations
* Class containing implementation of Apache Avro serializer
*/
class AvroSchemaRegistryUtils {
private final ClientLogger logger = new ClientLogger(AvroSchemaRegistryUtils.class);

class AvroSerializer {
private static final Map<Class<?>, Schema> PRIMITIVE_SCHEMAS;
private static final Schema NULL_SCHEMA = Schema.create(Schema.Type.NULL);
private static final int V1_HEADER_LENGTH = 10;
private static final byte[] V1_HEADER = new byte[]{-61, 1};

private final ClientLogger logger = new ClientLogger(AvroSerializer.class);
private final boolean avroSpecificReader;
private final Schema.Parser parser;
private final EncoderFactory encoderFactory;
private final DecoderFactory decoderFactory;

static {
final HashMap<Class<?>, Schema> schemas = new HashMap<>();

final Schema booleanSchema = Schema.create(Schema.Type.BOOLEAN);
schemas.put(Boolean.class, booleanSchema);
schemas.put(boolean.class, booleanSchema);

final Schema intSchema = Schema.create(Schema.Type.INT);
schemas.put(Integer.class, intSchema);
schemas.put(int.class, intSchema);

final Schema longSchema = Schema.create(Schema.Type.LONG);
schemas.put(Long.class, longSchema);
schemas.put(long.class, longSchema);

final Schema floatSchema = Schema.create(Schema.Type.FLOAT);
schemas.put(Float.class, floatSchema);
schemas.put(float.class, floatSchema);

final Schema doubleSchema = Schema.create(Schema.Type.DOUBLE);
schemas.put(Double.class, doubleSchema);
schemas.put(double.class, doubleSchema);

final Schema byteSchema = Schema.create(Schema.Type.BYTES);
schemas.put(byte.class, byteSchema);
schemas.put(Byte.class, byteSchema);
schemas.put(byte[].class, byteSchema);
schemas.put(Byte[].class, byteSchema);

// This class is abstract but not final.
schemas.put(ByteBuffer.class, byteSchema);

final Schema stringSchema = Schema.create(Schema.Type.STRING);
schemas.put(String.class, stringSchema);

PRIMITIVE_SCHEMAS = Collections.unmodifiableMap(schemas);
}

/**
* Instantiates AvroCodec instance
*
Expand All @@ -49,7 +92,7 @@ class AvroSchemaRegistryUtils {
* @param encoderFactory Encoder factory
* @param decoderFactory Decoder factory
*/
AvroSchemaRegistryUtils(boolean avroSpecificReader, Schema.Parser parser, EncoderFactory encoderFactory,
AvroSerializer(boolean avroSpecificReader, Schema.Parser parser, EncoderFactory encoderFactory,
DecoderFactory decoderFactory) {

this.avroSpecificReader = avroSpecificReader;
Expand All @@ -67,41 +110,19 @@ Schema parseSchemaString(String schemaString) {
return this.parser.parse(schemaString);
}

/**
* @param object Schema object used to generate schema string
*
* @return string representation of schema
*
* @see AvroSchemaUtils for distinction between primitive and Avro schema generation
*/
String getSchemaString(Object object) {
Schema schema = AvroSchemaUtils.getSchema(object);
return schema.toString();
}

/**
* Returns schema name for storing schemas in schema registry store.
*
* @param object Schema object used to generate schema path
*
* @return schema name as string
*
* @throws IllegalArgumentException if {@code object} is not a primitive type and not of type {@link
* GenericContainer}.
*/
String getSchemaName(Object object) {
return AvroSchemaUtils.getSchema(object).getFullName();
}

/**
* Returns A byte[] containing Avro encoding of object parameter.
*
* @param object Object to be encoded into byte stream
*
* @return A set of bytes that represent the object.
*
* @throws IllegalArgumentException If the object is not a serializable type.
* @throws IllegalStateException if the object could not be serialized to an object stream or there was a
* runtime exception during serialization.
*/
<T> byte[] encode(T object) throws IOException {
final Schema schema = AvroSchemaUtils.getSchema(object);
<T> byte[] encode(T object) {
final Schema schema = getSchema(object);

try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
if (object instanceof byte[]) {
Expand Down Expand Up @@ -159,31 +180,31 @@ <T> T decode(byte[] bytes, byte[] schemaBytes, TypeReference<T> typeReference) {
}

/**
* Returns correct reader for decoding payload.
* Returns Avro schema for specified object, including null values
*
* @param writerSchema Avro schema fetched from schema registry store
* @param object object for which Avro schema is being returned
*
* @return correct Avro DatumReader object given encoder configuration
* @return Avro schema for object's data structure
*
* @throws IllegalArgumentException if object type is unsupported.
*/
@SuppressWarnings("unchecked")
private <T> DatumReader<T> getDatumReader(Schema writerSchema, TypeReference<T> typeReference) {
boolean writerSchemaIsPrimitive = writerSchema.getType() != null
&& AvroSchemaUtils.getPrimitiveSchemas().containsKey(writerSchema.getType());
static Schema getSchema(Object object) {
if (object instanceof GenericContainer) {
return ((GenericContainer) object).getSchema();
}

if (writerSchemaIsPrimitive) {
if (avroSpecificReader) {
return new SpecificDatumReader<>(writerSchema);
} else {
return new GenericDatumReader<>(writerSchema);
}
if (object == null) {
return NULL_SCHEMA;
}

// Suppressing this warning because we know that the Type is a representation of the Class<T>
final Class<T> clazz = (Class<T>) typeReference.getJavaType();
if (SpecificRecord.class.isAssignableFrom(clazz)) {
return new SpecificDatumReader<>(writerSchema);
final Class<?> objectClass = object.getClass();
final Schema primitiveSchema = getPrimitiveSchema(objectClass);
if (primitiveSchema != null) {
return primitiveSchema;
} else {
return new GenericDatumReader<>(writerSchema);
throw new IllegalArgumentException("Unsupported Avro type. Supported types are null, GenericContainer,"
+ " Boolean, Integer, Long, Float, Double, String, Byte[], Byte, ByteBuffer, and their primitive"
+ " equivalents. Actual: " + objectClass);
}
}

Expand All @@ -195,15 +216,64 @@ private <T> DatumReader<T> getDatumReader(Schema writerSchema, TypeReference<T>
* </ul>
*
* @param schemaBytes Bytes to read from.
*
* @return true if the object has the single object payload header; false otherwise.
*
* @see <a href="https://avro.apache.org/docs/current/spec.html#single_object_encoding">Single Object Encoding</a>
*/
private static boolean isSingleObjectEncoded(byte[] schemaBytes) {
static boolean isSingleObjectEncoded(byte[] schemaBytes) {
if (schemaBytes.length < V1_HEADER_LENGTH) {
return false;
}

return V1_HEADER[0] == schemaBytes[0] && V1_HEADER[1] == schemaBytes[1];
}

/**
* Gets a schema for the given class if it is an Avro primitive type.
*
* @param clazz Object class
*
* @return Matching primitive schema, otherwise {@code null} if it is not.
*/
private static Schema getPrimitiveSchema(Class<?> clazz) {
final Schema schema = PRIMITIVE_SCHEMAS.get(clazz);
if (schema != null) {
return schema;
} else if (CharSequence.class.isAssignableFrom(clazz)) {
return PRIMITIVE_SCHEMAS.get(String.class);
} else if (ByteBuffer.class.isAssignableFrom(clazz)) {
return PRIMITIVE_SCHEMAS.get(Byte[].class);
} else {
return null;
}
}

/**
* Returns correct reader for decoding payload.
*
* @param writerSchema Avro schema fetched from schema registry store
*
* @return correct Avro DatumReader object given encoder configuration
*/
@SuppressWarnings("unchecked")
private <T> DatumReader<T> getDatumReader(Schema writerSchema, TypeReference<T> typeReference) {
// Suppressing this warning because we know that the Type is a representation of the Class<T>
final Class<T> clazz = (Class<T>) typeReference.getJavaType();
final Schema primitiveSchema = getPrimitiveSchema(clazz);

if (primitiveSchema != null) {
if (avroSpecificReader) {
return new SpecificDatumReader<>(writerSchema);
} else {
return new GenericDatumReader<>(writerSchema);
}
}

if (SpecificRecord.class.isAssignableFrom(clazz)) {
return new SpecificDatumReader<>(writerSchema);
} else {
return new GenericDatumReader<>(writerSchema);
}
}
}
Loading

0 comments on commit c7cc461

Please sign in to comment.