This repository contains the smart serializer and deserializer for Apache Kafka® clients.
The smart serializer and deserializer allows for engineers to create consumers and producers without knowing the underlying stream contents.
If a low dependency model is required, it is recommended to review usage of required serializer and deserializer libraries as outlined here, as this repository includes all serializers and deserializers by default.
Here is a talk from Current 24 where the library was presented.
Deserializer
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-client-smart-deserializer</artifactId>
<version>${kafka-client-smart-ser-des.version}</version>
</dependency>Serializer
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-client-smart-serializer</artifactId>
<version>${kafka-client-smart-ser-des.version}</version>
</dependency>Common (Schema Registry Client Config and Utils)
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-client-smart-common</artifactId>
<version>${kafka-client-smart-ser-des.version}</version>
</dependency>Kafka Producer Initialization
KafkaProducer<K, V> producer=new KafkaProducer<>(properties,
new ConfluentSerializer<>(properties,true),
new ConfluentSerializer<>(properties,false));Kafka Consumer Initialization
try(KafkaConsumer<K, V> consumer=new KafkaConsumer<>(
properties,
new ConfluentDeserializer<>(properties,true,kClass),
new ConfluentDeserializer<V>(properties,false,vClass))){| Primitive Types | Serializer Docs | Deserializer Docs |
|---|---|---|
| String | Serializer | Deserializer |
| Boolean | Serializer | Deserializer |
| Float | Serializer | Deserializer |
| Double | Serializer | Deserializer |
| Integer | Serializer | Deserializer |
| Long | Serializer | Deserializer |
| Short | Serializer | Deserializer |
| Bytes | Serializer | Deserializer |
| ByteArray | Serializer | Deserializer |
| ByteBuffer | Serializer | Deserializer |
| UUID | Serializer | Deserializer |
| Schemaless Types | Serializer Docs | Deserializer Docs |
|---|---|---|
| KafkaJSON | Serializer | Deserializer |
| Schema Types | Serializer Docs | Deserializer Docs |
|---|---|---|
| KafkaAvro | Serializer | Deserializer |
| KafkaJSONSchema | Serializer | Deserializer |
| Protobuf | Serializer | Deserializer |
The SerializationTypes.class is used to determine the type of data type that needs to be serialized/deserialized via a range of methods:
This method is used to determine the serialization type from the headers and compared against an Enum matching the data type via valueOf
This method is used to determine the serialization type from a string and compared against an Enum matching the data type via valueOf
This method is used to determine the serialization type of the message via the comparison of bytes. (Primitives are checked first then Schema)
| Primitive Types | Bytes Check |
|---|---|
| String (Default) | bytes.length == 0 (edge case of possible empty string "") |
| Boolean | bytes.length == 1 && if (bytes[0] == 0x00 OR bytes[0] == 0x01) |
| ByteArray | bytes.length == 1 |
| Short | bytes.length == 2 |
| Integer | bytes.length == 4 |
| Long | bytes.length == 8 |
| Schemaless Types | Bytes Check |
|---|---|
| JSON | (bytes[0] == '{' && bytes[bytes.length - 1] == '}') OR (bytes[0] == '[' && bytes[bytes.length - 1] == ']')) |
Remaining Schema Types are compared in the Schema Check
This method is used to determine the serialization type of the message via the use of the Schema Registry supplier.
| Schema Types | Schema Type |
|---|---|
| Avro | AVRO |
| KafkaJSONSchema | JSON |
| Protobuf | PROTOBUF |
stateDiagram-v2
state if_state <<choice>>
state fork_state <<fork>>
[*] --> if_state
if_state --> PrimitiveTypes: bytes <=8
if_state --> fork_state: bytes > 8
fork_state --> SchemaRegistryTypes: hasMagicByte
fork_state --> SchemalessTypes
state SchemalessTypes {
state is_json <<choice>>
is_json --> json: if '{...}' or '[...]'
is_json --> string: else
}
We welcome contributions to the Kafka Client Smart Serializer/Deserializer project! Here's how you can get involved:
- Clone your fork locally:
git clone https://github.com/your-username/kafka-client-smart-ser-des.git cd kafka-client-smart-ser-des
-
Install dependencies:
mvn clean install -
Run tests to ensure everything works:
mvn clean test -
Run integration tests (if applicable):
mvn verify
-
Create a feature branch:
git checkout -b feature/your-feature-name -
Make your changes following these guidelines:
- Follow the existing code style and conventions
- Add appropriate unit tests for new functionality
- Update documentation as needed
- Ensure all tests pass
-
Commit your changes with clear, descriptive commit messages:
git add . git commit -m "Add feature: brief description of changes"
-
Push your branch to your fork:
git push origin feature/your-feature-name -
Create a Pull Request on GitHub with:
- A clear title and description
- Reference to any related issues
- Screenshots or examples if applicable
- Java: Follow standard Java conventions and the existing code style
- Testing: Maintain or improve test coverage
- Documentation: Update README and other docs as needed
- Backwards Compatibility: Consider impact on existing users
We welcome various types of contributions:
- Bug fixes
- New features
- Documentation improvements
- Test enhancements
- Performance optimizations
- Code refactoring
- Bug reports: Use GitHub Issues with detailed reproduction steps
- Feature requests: Use GitHub Issues with clear use cases
Please be respectful and constructive in all interactions. We aim to create a welcoming environment for all contributors.
Thank you for contributing to the Kafka Client Smart Serializer/Deserializer project!