Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions kafka/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,6 @@ You can configure the Encoding and EventFormat to use to emit the event.
Check out the [`CloudEventSerializer`](src/main/java/io/cloudevents/kafka/CloudEventSerializer.java)
javadoc for more info.

### Partition key extension

If you want your producer to use the `partitionkey` extension, you can use the [`PartitionKeyExtensionInterceptor`](src/main/java/io/cloudevents/kafka/PartitionKeyExtensionInterceptor.java).

```java
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, io.cloudevents.kafka.PartitionKeyExtensionInterceptor.class);
```

When using in your producer, this interceptor will pick the `partitionkey` extension from the event and will set it as record key.
Check out the [`PartitionKeyExtensionInterceptor`](src/main/java/io/cloudevents/kafka/PartitionKeyExtensionInterceptor.java)
javadoc for more info.

## Consuming CloudEvents

To consume CloudEvents in Kafka, configure the KafkaConsumer to use the provided [`CloudEventDeserializer`](src/main/java/io/cloudevents/kafka/CloudEventDeserializer.java):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package io.cloudevents.kafka;

import io.cloudevents.CloudEvent;
import io.cloudevents.CloudEventData;
import io.cloudevents.core.message.MessageReader;
import io.cloudevents.rw.CloudEventDataMapper;
import org.apache.kafka.common.header.Headers;
Expand All @@ -36,13 +35,13 @@ public class CloudEventDeserializer implements Deserializer<CloudEvent> {

public final static String MAPPER_CONFIG = "cloudevents.datamapper";

private CloudEventDataMapper<? extends CloudEventData> mapper = null;
private CloudEventDataMapper mapper = null;

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
Object mapperConfig = configs.get(MAPPER_CONFIG);
if (mapperConfig instanceof CloudEventDataMapper) {
this.mapper = (CloudEventDataMapper<? extends CloudEventData>) mapperConfig;
this.mapper = (CloudEventDataMapper) mapperConfig;
} else if (mapperConfig != null) {
throw new IllegalArgumentException(MAPPER_CONFIG + " can be of type String or " + CloudEventDataMapper.class.getCanonicalName());
}
Expand Down

This file was deleted.

This file was deleted.