Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@
import java.util.Map;

/**
* Deserializer for {@link CloudEvent}.
* Kafka {@link Deserializer} for {@link CloudEvent}.
* <p>
* To configure the {@link CloudEventDataMapper} to use, you can provide the instance through the configuration key
* {@link CloudEventDeserializer#MAPPER_CONFIG}.
* To configure a {@link CloudEventDataMapper}, you can provide the instance through the configuration key {@link CloudEventDeserializer#MAPPER_CONFIG}.
*/
public class CloudEventDeserializer implements Deserializer<CloudEvent> {

/**
* The configuration key for the {@link CloudEventDataMapper}.
*/
public final static String MAPPER_CONFIG = "cloudevents.datamapper";

private CloudEventDataMapper<? extends CloudEventData> mapper = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.kafka.common.serialization.Deserializer;

/**
* Deserializer for {@link MessageReader}
* Kafka {@link Deserializer} for {@link MessageReader}.
*/
public class CloudEventMessageDeserializer implements Deserializer<MessageReader> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import java.util.Map;

/**
* Serializer for {@link MessageReader}. This {@link Serializer} can't be used as a key serializer.
* Kafka {@link Serializer} for {@link MessageReader}.
* <p>
* This {@link Serializer} can't be used as a key serializer.
*/
public class CloudEventMessageSerializer implements Serializer<MessageReader> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,23 @@
import java.util.Map;

/**
* Serializer for {@link CloudEvent}.
* Kafka {@link Serializer} for {@link CloudEvent}.
* <p>
* To configure the encoding to serialize the event, you can use the {@link CloudEventSerializer#ENCODING_CONFIG} configuration key,
* To configure the encoding to use when serializing the event, you can use the {@link CloudEventSerializer#ENCODING_CONFIG} configuration key,
* which accepts both a {@link String} or a variant of the enum {@link Encoding}. If you configure the Encoding as {@link Encoding#STRUCTURED},
* you MUST configure the {@link EventFormat} to use with {@link CloudEventSerializer#EVENT_FORMAT_CONFIG}, specifying a {@link String}
* corresponding to the content type of the event format or specifying an instance of {@link EventFormat}.
*/
public class CloudEventSerializer implements Serializer<CloudEvent> {

/**
* The configuration key for the {@link Encoding} to use when serializing the event.
*/
public final static String ENCODING_CONFIG = "cloudevents.serializer.encoding";

/**
* The configuration key for the {@link EventFormat} to use when serializing the event in structured mode.
*/
public final static String EVENT_FORMAT_CONFIG = "cloudevents.serializer.event_format";

private Encoding encoding = Encoding.BINARY;
Expand Down
14 changes: 10 additions & 4 deletions kafka/src/main/java/io/cloudevents/kafka/KafkaMessageFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@

package io.cloudevents.kafka;

import io.cloudevents.SpecVersion;
import io.cloudevents.core.message.MessageReader;
import io.cloudevents.core.message.MessageWriter;
import io.cloudevents.core.message.impl.GenericStructuredMessageReader;
import io.cloudevents.core.message.impl.MessageUtils;
import io.cloudevents.kafka.impl.KafkaBinaryMessageReaderImpl;
import io.cloudevents.kafka.impl.KafkaHeaders;
import io.cloudevents.kafka.impl.KafkaProducerMessageWriterImpl;
import io.cloudevents.rw.CloudEventRWException;
import io.cloudevents.rw.CloudEventWriter;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand All @@ -34,6 +36,9 @@
/**
* This class provides a collection of methods to create {@link io.cloudevents.core.message.MessageReader}
* and {@link io.cloudevents.core.message.MessageWriter} for Kafka Producer and Consumer.
* <p>
* These can be used as an alternative to {@link CloudEventDeserializer} and {@link CloudEventSerializer} to
* manually serialize/deserialize {@link io.cloudevents.CloudEvent} messages.
*/
@ParametersAreNonnullByDefault
public final class KafkaMessageFactory {
Expand All @@ -42,20 +47,21 @@ private KafkaMessageFactory() {
}

/**
* Create a {@link io.cloudevents.core.message.MessageReader} to read {@link ConsumerRecord}
* Create a {@link io.cloudevents.core.message.MessageReader} to read {@link ConsumerRecord}.
*
* @param record the record to convert to {@link io.cloudevents.core.message.MessageReader}
* @param <K> the type of the record key
* @return the new {@link io.cloudevents.core.message.MessageReader}
* @throws CloudEventRWException if something goes wrong while resolving the {@link SpecVersion} or if the message has unknown encoding
*/
public static <K> MessageReader createReader(ConsumerRecord<K, byte[]> record) throws IllegalArgumentException {
public static <K> MessageReader createReader(ConsumerRecord<K, byte[]> record) throws CloudEventRWException {
return createReader(record.headers(), record.value());
}

/**
* @see #createReader(ConsumerRecord)
*/
public static MessageReader createReader(Headers headers, byte[] payload) throws IllegalArgumentException {
public static MessageReader createReader(Headers headers, byte[] payload) throws CloudEventRWException {
return MessageUtils.parseStructuredOrBinaryMessage(
() -> KafkaHeaders.getParsedKafkaHeader(headers, KafkaHeaders.CONTENT_TYPE),
format -> new GenericStructuredMessageReader(format, payload),
Expand All @@ -65,7 +71,7 @@ public static MessageReader createReader(Headers headers, byte[] payload) throws
}

/**
* Create a {@link io.cloudevents.core.message.MessageWriter} to write a {@link org.apache.kafka.clients.producer.ProducerRecord}
* Create a {@link io.cloudevents.core.message.MessageWriter} to write a {@link org.apache.kafka.clients.producer.ProducerRecord}.
*
* @param topic the topic where to write the record
* @param partition the partition where to write the record
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,15 @@
* This {@link ProducerInterceptor} implements the partitioning extension,
* as described in the <a href="https://github.com/cloudevents/spec/blob/master/kafka-protocol-binding.md#31-key-mapping">CloudEvents Kafka specification</a>.
* <p>
* When using in your producer, it will pick the {@code partitionkey} extension from the event and will set it as record key.
* When using in your {@link org.apache.kafka.clients.producer.KafkaProducer},
* it will pick the {@code partitionkey} extension from the event and will set it as record key.
* If the extension is missing, It won't replace the key from the original record.
*/
public class PartitionKeyExtensionInterceptor implements ProducerInterceptor<Object, CloudEvent> {

/**
* The extension key of partition key extension.
*/
public static final String PARTITION_KEY_EXTENSION = "partitionkey";

@Override
Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@
<link>https://docs.spring.io/spring-framework/docs/current/javadoc-api/</link>
<link>https://vertx.io/docs/apidocs/</link>
<link>https://jakarta.ee/specifications/platform/8/apidocs/</link>
<link>https://kafka.apache.org/25/javadoc/</link>
<link>https://qpid.apache.org/releases/qpid-proton-j-0.33.7/api/</link>
</links>
</configuration>
Expand Down