Skip to content

PartitionKey Kafka Interceptor #260

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions kafka/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,18 @@ You can configure the Encoding and EventFormat to use to emit the event.
Check out the [`CloudEventSerializer`](src/main/java/io/cloudevents/kafka/CloudEventSerializer.java)
javadoc for more info.

### Partition key extension

If you want your producer to use the `partitionkey` extension, you can use the [`PartitionKeyExtensionInterceptor`](src/main/java/io/cloudevents/kafka/PartitionKeyExtensionInterceptor.java).

```java
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, io.cloudevents.kafka.PartitionKeyExtensionInterceptor.class);
```

When using in your producer, this interceptor will pick the `partitionkey` extension from the event and will set it as record key.
Check out the [`PartitionKeyExtensionInterceptor`](src/main/java/io/cloudevents/kafka/PartitionKeyExtensionInterceptor.java)
javadoc for more info.

## Consuming CloudEvents

To consume CloudEvents in Kafka, configure the KafkaConsumer to use the provided [`CloudEventDeserializer`](src/main/java/io/cloudevents/kafka/CloudEventDeserializer.java):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package io.cloudevents.kafka;

import io.cloudevents.CloudEvent;
import io.cloudevents.CloudEventData;
import io.cloudevents.core.message.MessageReader;
import io.cloudevents.rw.CloudEventDataMapper;
import org.apache.kafka.common.header.Headers;
Expand All @@ -35,13 +36,13 @@ public class CloudEventDeserializer implements Deserializer<CloudEvent> {

public final static String MAPPER_CONFIG = "cloudevents.datamapper";

private CloudEventDataMapper mapper = null;
private CloudEventDataMapper<? extends CloudEventData> mapper = null;

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
Object mapperConfig = configs.get(MAPPER_CONFIG);
if (mapperConfig instanceof CloudEventDataMapper) {
this.mapper = (CloudEventDataMapper) mapperConfig;
this.mapper = (CloudEventDataMapper<? extends CloudEventData>) mapperConfig;
} else if (mapperConfig != null) {
throw new IllegalArgumentException(MAPPER_CONFIG + " can be of type String or " + CloudEventDataMapper.class.getCanonicalName());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package io.cloudevents.kafka;

import io.cloudevents.CloudEvent;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;

/**
* This {@link ProducerInterceptor} implements the partitioning extension,
* as described in the <a href="https://github.com/cloudevents/spec/blob/master/kafka-protocol-binding.md#31-key-mapping">CloudEvents Kafka specification</a>.
* <p>
* When using in your producer, it will pick the {@code partitionkey} extension from the event and will set it as record key.
* If the extension is missing, It won't replace the key from the original record.
*/
public class PartitionKeyExtensionInterceptor implements ProducerInterceptor<Object, CloudEvent> {

public static final String PARTITION_KEY_EXTENSION = "partitionkey";

@Override
public ProducerRecord<Object, CloudEvent> onSend(ProducerRecord<Object, CloudEvent> record) {
if (record.value() == null) {
return record;
}
Object partitionKey = record.value().getExtension(PARTITION_KEY_EXTENSION);
if (partitionKey == null) {
return record;
}

return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), partitionKey, record.value(), record.headers());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the second field should be calculated based upon 1) the number of partitions, 2) the extension 3) the input record

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that's what the spec states: https://github.com/cloudevents/spec/blob/master/kafka-protocol-binding.md#31-key-mapping

I don't quite get what the implementation should look like to you, can you provide a sample code/pr of how you would implement it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In particular:

Every implementation SHOULD provide a default "Key Mapper" implementation that maps the Partitioning partitionkey attribute value to the 'key' of the Kafka message as-is, if present.

It's named partitionkey, but in fact we're talking about the key of the message

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}

@Override
public void close() {
}

@Override
public void configure(Map<String, ?> configs) {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package io.cloudevents.kafka;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.core.test.Data;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

class PartitionKeyExtensionInterceptorTest {

@Test
public void testNoPartitionKeyAndNoOriginalKey() {
assertKey(
new ProducerRecord<>("aaa", Data.V1_MIN),
null
);
}

@Test
public void testNoPartitionKey() {
assertKey(
new ProducerRecord<>("aaa", "blabla", Data.V1_MIN),
"blabla"
);
}

@Test
public void testPartitionKeyAndNoOriginalKey() {
assertKey(
new ProducerRecord<>("aaa", CloudEventBuilder
.v1(Data.V1_MIN)
.withExtension(PartitionKeyExtensionInterceptor.PARTITION_KEY_EXTENSION, "albalb")
.build()
),
"albalb"
);
}

@Test
public void testPartitionKey() {
assertKey(
new ProducerRecord<>("aaa", "blabla", CloudEventBuilder
.v1(Data.V1_MIN)
.withExtension(PartitionKeyExtensionInterceptor.PARTITION_KEY_EXTENSION, "albalb")
.build()
),
"albalb"
);
}

private void assertKey(ProducerRecord<Object, CloudEvent> record, Object expectedKey) {
PartitionKeyExtensionInterceptor interceptor = new PartitionKeyExtensionInterceptor();
assertThat(interceptor.onSend(record).key())
.isEqualTo(expectedKey);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should validate against a calculated partition-id and not change the key

}

}