Skip to content

Commit ff2ca27

Browse files
committed
feat(impl): Support interception of output record before serialization
Fixes #147. Introduction of a new `OutputRecordInterceptor` interface to be implemented to be able to intercept outgoing messages before their payload is serialized. Its aim is to replace the `ProducerOnSendInterceptor` that at the difference with `OutputRecordInterceptor` is called after the serialization of the payload. Henceforth `ProducerOnSendInterceptor` is deprecated. Version is bumped to 4.1 to reflect the new feature. It is backwards-compatible. The TracingProducerInterceptor has been rewritten as a TracingOutputRecordInterceptor, but kept for backwards-compatibility. It is itself marked for removal as well. Documentation updated accordingly.
1 parent e6896ca commit ff2ca27

File tree

33 files changed

+784
-31
lines changed

33 files changed

+784
-31
lines changed

.github/project.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
release:
22
current-version: "4.0.3"
3-
next-version: "4.0.0-SNAPSHOT"
3+
next-version: "4.1.0-SNAPSHOT"

api/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<parent>
55
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
66
<artifactId>quarkus-kafka-streams-processor-parent</artifactId>
7-
<version>4.0.0-SNAPSHOT</version>
7+
<version>4.1.0-SNAPSHOT</version>
88
</parent>
99
<artifactId>quarkus-kafka-streams-processor-api</artifactId>
1010
<name>quarkus-kafka-streams-processor-api</name>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package io.quarkiverse.kafkastreamsprocessor.api.decorator.outputrecord;
2+
3+
import org.apache.kafka.streams.processor.api.Record;
4+
5+
/**
6+
* Interceptor that if implemented is called whenever the processor calls any of
7+
* {@link org.apache.kafka.streams.processor.api.ProcessorContext}'s forward methods.
8+
* <p>
9+
* Order of execution is guaranteed based on the integer priority returned by {@link #priority()}.
10+
* </p>
11+
* <p>
12+
* It differs from {@link io.quarkiverse.kafkastreamsprocessor.api.decorator.producer.ProducerOnSendInterceptor} in that
13+
* it is executed before any serialization of the payload to byte[] happens.
14+
* </p>
15+
*/
16+
public interface OutputRecordInterceptor {
17+
/**
18+
* By default, if not overriden, the interceptor has the following priority.
19+
*/
20+
int DEFAULT_PRIORITY = 100;
21+
22+
/**
23+
* Override this method to finely tune the order of execution of any interceptor you implement.
24+
*
25+
* @return the custom priority you want to assign. A number between 0 and {@link Integer#MAX_VALUE}.
26+
*/
27+
default int priority() {
28+
return DEFAULT_PRIORITY;
29+
}
30+
31+
/**
32+
* Intercept the record before it is eventually given to
33+
* {@link org.apache.kafka.streams.processor.api.ProcessorContext#forward(Record, String)}.
34+
*
35+
* @param record the record has the processor requested it to be forwarded
36+
* @return the new record with any modifications this interceptor wants to apply before serialization.
37+
*/
38+
Record interceptOutputRecord(Record record);
39+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*-
2+
* #%L
3+
* Quarkus Kafka Streams Processor
4+
* %%
5+
* Copyright (C) 2024 Amadeus s.a.s.
6+
* %%
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
* #L%
19+
*/
20+
package io.quarkiverse.kafkastreamsprocessor.api.decorator.outputrecord;
21+
22+
/**
23+
* Priorities of the output record interceptors the framework provides.
24+
*/
25+
public final class OutputRecordInterceptorPriorities {
26+
/**
27+
* Priority of the interceptor that will inject the tracing headers for propagation.
28+
*/
29+
public static final int TRACING = 100;
30+
31+
private OutputRecordInterceptorPriorities() {
32+
33+
}
34+
}

api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/decorator/processor/AbstractProcessorDecorator.java

-4
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,7 @@
3939
* Class introduced in 2.0, for compatibility with Quarkus 3.8 random failure to start when using custom processor
4040
* decorators.
4141
* </p>
42-
*
43-
* @deprecated It will be removed in 3.0, with the integration of Quarkus 3.15 where we will be able to go back to pure
44-
* CDI decorators.
4542
*/
46-
@Deprecated(forRemoval = true, since = "2.0")
4743
public abstract class AbstractProcessorDecorator implements Processor {
4844
/**
4945
* The decorated processor, holding either the next decorator layer or the final processor.

api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/decorator/processor/ProcessorDecoratorPriorities.java

+11
Original file line numberDiff line numberDiff line change
@@ -58,25 +58,36 @@ public final class ProcessorDecoratorPriorities {
5858
* {@link ContextualProcessor#process(Record)} method.
5959
*/
6060
public static final int TRACING = 100;
61+
6162
/**
6263
* Priority of the decorator in charge or initializing a "request context" for the duration of the processing of the
6364
* ContextualProcessor#process(Record)} method. It is closed afterward.
6465
*/
6566
public static final int CDI_REQUEST_SCOPE = 200;
67+
68+
/**
69+
* Priority for the decorator that wraps the {@link org.apache.kafka.streams.processor.api.ProcessorContext} to
70+
* intercept calls to its <code>forward</code> methods.
71+
*/
72+
public static final int CONTEXT_FORWARD = 250;
73+
6674
/**
6775
* Priority of the decorator that will handle exception and potentially redirect the message in a dead letter queue
6876
* topic, if configured.
6977
*/
7078
public static final int DLQ = 300;
79+
7180
/**
7281
* Priority of the decorator in charge of measuring the processing time and the number of exceptions thrown.
7382
*/
7483
public static final int METRICS = 400;
84+
7585
/**
7686
* Priority of the decorator in charge of injecting all {@link DecoratedPunctuator} configured by the framework and
7787
* your custom potential additions.
7888
*/
7989
public static final int PUNCTUATOR_DECORATION = 500;
90+
8091
/**
8192
* Priority of the decorator in charge of implementing a form of fault tolerance by means of calling again the
8293
* {@link ContextualProcessor#process(Record)} method.

api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/decorator/producer/ProducerInterceptorPriorities.java

+4
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,11 @@
2121

2222
/**
2323
* Priorities of the producer interceptors the framework provides.
24+
*
25+
* @deprecated Change any producer interceptor into a
26+
* {@link io.quarkiverse.kafkastreamsprocessor.api.decorator.outputrecord.OutputRecordInterceptor}.
2427
*/
28+
@Deprecated(forRemoval = true, since = "4.1")
2529
public final class ProducerInterceptorPriorities {
2630
/**
2731
* Priority of the interceptor that will inject the tracing headers for propagation.

api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/decorator/producer/ProducerOnSendInterceptor.java

+4
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,11 @@
2626

2727
/**
2828
* Interface to extend to by able to decorate the production of the response message to the outgoing topic.
29+
*
30+
* @deprecated Change any producer interceptor into a
31+
* {@link io.quarkiverse.kafkastreamsprocessor.api.decorator.outputrecord.OutputRecordInterceptor}.
2932
*/
33+
@Deprecated(forRemoval = true, since = "4.1")
3034
public interface ProducerOnSendInterceptor extends ProducerInterceptor<byte[], byte[]> {
3135
/**
3236
* By default, if not overriden, the interceptor has the following priority.

bom/application/pom.xml

+2-2
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,13 @@
44
<parent>
55
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
66
<artifactId>quarkus-kafka-streams-processor-bom-parent</artifactId>
7-
<version>4.0.0-SNAPSHOT</version>
7+
<version>4.1.0-SNAPSHOT</version>
88
</parent>
99
<artifactId>quarkus-kafka-streams-processor-bom</artifactId>
1010
<name>quarkus-kafka-streams-processor-bom</name>
1111
<packaging>pom</packaging>
1212
<properties>
13-
<quarkus-kafka-streams-processor.version>4.0.0-SNAPSHOT</quarkus-kafka-streams-processor.version>
13+
<quarkus-kafka-streams-processor.version>4.1.0-SNAPSHOT</quarkus-kafka-streams-processor.version>
1414
</properties>
1515
<dependencyManagement>
1616
<dependencies>

bom/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<parent>
55
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
66
<artifactId>quarkus-kafka-streams-processor-parent</artifactId>
7-
<version>4.0.0-SNAPSHOT</version>
7+
<version>4.1.0-SNAPSHOT</version>
88
</parent>
99
<artifactId>quarkus-kafka-streams-processor-bom-parent</artifactId>
1010
<name>quarkus-kafka-streams-processor-bom-parent</name>

bom/test/pom.xml

+3-3
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<parent>
55
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
66
<artifactId>quarkus-kafka-streams-processor-bom-parent</artifactId>
7-
<version>4.0.0-SNAPSHOT</version>
7+
<version>4.1.0-SNAPSHOT</version>
88
</parent>
99
<artifactId>quarkus-kafka-streams-processor-test-bom</artifactId>
1010
<name>quarkus-kafka-streams-processor-test-bom</name>
@@ -19,7 +19,7 @@
1919
<dependency>
2020
<groupId>org.hamcrest</groupId>
2121
<artifactId>hamcrest</artifactId>
22-
<version>2.2</version>
22+
<version>3.0</version>
2323
</dependency>
2424
<dependency>
2525
<groupId>com.spotify</groupId>
@@ -89,7 +89,7 @@
8989
<dependency>
9090
<groupId>org.assertj</groupId>
9191
<artifactId>assertj-core</artifactId>
92-
<version>3.26.3</version>
92+
<version>3.27.3</version>
9393
</dependency>
9494
<!-- Mockserver -->
9595
<dependency>

deployment/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<parent>
55
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
66
<artifactId>quarkus-kafka-streams-processor-parent</artifactId>
7-
<version>4.0.0-SNAPSHOT</version>
7+
<version>4.1.0-SNAPSHOT</version>
88
</parent>
99
<artifactId>quarkus-kafka-streams-processor-deployment</artifactId>
1010
<name>quarkus-kafka-streams-processor-deployment</name>

docs/modules/ROOT/pages/index.adoc

+52-2
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ For keys, only type `String` is supported.
120120
For values, see <<_data_serialization, supported types>>.
121121
<3> The `process` method is called for each incoming message.
122122
Any processing must be done synchronously.
123-
<4> The `forward` method allows to produce an outgoing message.
123+
<4> The `outputrecord` method allows to produce an outgoing message.
124124
Internally Kafka Streams links the acknowledgment on the outgoing message with the commit of the associated incoming message.
125125
126126
=== Topology builder
@@ -745,6 +745,11 @@ The priority is to be set based on the priorities of the existing decorators whi
745745
* your custom potential additions.
746746
*/
747747
public static final int PUNCTUATOR_DECORATION = 500;
748+
/**
749+
* Priority for the decorator that wraps the {@link org.apache.kafka.streams.processor.api.ProcessorContext} to
750+
* intercept calls to its <code>forward</code> methods.
751+
*/
752+
public static final int CONTEXT_FORWARD = 550;
748753
/**
749754
* Priority of the decorator in charge of implementing a form of fault tolerance by means of calling again the
750755
* {@link ContextualProcessor#process(Record)} method.
@@ -757,7 +762,52 @@ The priority is to be set based on the priorities of the existing decorators whi
757762
Such a decorator will automatically been taken into account by CDI.
758763
The priority will control at which point your decorator will be called among all other decorators.
759764

760-
=== Producer interceptor
765+
[#_output_record_interceptor]
766+
=== Output record interceptor
767+
768+
A `Processor` usually ends with the forwarding of an outgoing message.
769+
For that purpose, the `ProcessorContext#foward` methods are to be used.
770+
771+
From `quarkus-kafka-streams-processor` 4.1, calls to those methods can be intercepted.
772+
If you define a bean of type `OutputRecordInterceptor`, it will be taken up by the framework and executed.
773+
There is an order to multiple instances of interceptors, driven by the value returned by `OutputRecordInterceptor#priority`.
774+
775+
Example of interceptor:
776+
777+
[source,java]
778+
----
779+
@ApplicationScoped
780+
public class MyOutputRecordInterceptor implements OutputRecordInterceptor {
781+
@Override
782+
public Record interceptOutputRecord(Record record) {
783+
record.headers().add("MyHeader", "MyValue".getBytes(StandardCharsets.UTF_8));
784+
return record;
785+
}
786+
787+
@Override
788+
public int priority() {
789+
return 150; // <1>
790+
}
791+
}
792+
----
793+
794+
<1> Priority of this interceptor.
795+
Default priority is 100.
796+
References for priorities of existing interceptors this library provides:
797+
+
798+
[source,java]
799+
----
800+
/**
801+
* Priority of the interceptor that will inject the tracing headers for propagation.
802+
*/
803+
public static final int TRACING = 100;
804+
----
805+
+
806+
Lower priority are executed first.
807+
808+
=== Producer interceptor [deprecated]
809+
810+
WARNING: Producer interceptors are from 4.1 deprecated. They should be reimplemented as xref:_output_record_interceptor[OutputRecordInterceptor].
761811

762812
Kafka Streams already has the concept of a https://kafka.apache.org/36/javadoc/org/apache/kafka/clients/producer/ProducerInterceptor.html[ProducerInterceptor].
763813
But as the rest of Kafka Streams SPI, it is based on a class name and a default constructor for instantiation.

docs/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<parent>
55
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
66
<artifactId>quarkus-kafka-streams-processor-parent</artifactId>
7-
<version>4.0.0-SNAPSHOT</version>
7+
<version>4.1.0-SNAPSHOT</version>
88
<relativePath>../pom.xml</relativePath>
99
</parent>
1010

impl/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<parent>
55
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
66
<artifactId>quarkus-kafka-streams-processor-parent</artifactId>
7-
<version>4.0.0-SNAPSHOT</version>
7+
<version>4.1.0-SNAPSHOT</version>
88
</parent>
99
<artifactId>quarkus-kafka-streams-processor-impl</artifactId>
1010
<name>quarkus-kafka-streams-processor-impl</name>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*-
2+
* #%L
3+
* Quarkus Kafka Streams Processor
4+
* %%
5+
* Copyright (C) 2024 Amadeus s.a.s.
6+
* %%
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
* #L%
19+
*/
20+
package io.quarkiverse.kafkastreamsprocessor.impl.decorator.outputrecord;
21+
22+
import jakarta.enterprise.context.ApplicationScoped;
23+
import jakarta.inject.Inject;
24+
25+
import org.apache.kafka.streams.processor.api.Record;
26+
27+
import io.opentelemetry.api.OpenTelemetry;
28+
import io.opentelemetry.context.Context;
29+
import io.quarkiverse.kafkastreamsprocessor.api.decorator.outputrecord.OutputRecordInterceptor;
30+
import io.quarkiverse.kafkastreamsprocessor.api.decorator.outputrecord.OutputRecordInterceptorPriorities;
31+
import io.quarkiverse.kafkastreamsprocessor.propagation.KafkaTextMapSetter;
32+
33+
/**
34+
* Producer interceptor that injects the tracing headers for propagation.
35+
*/
36+
@ApplicationScoped
37+
public class TracingOutputRecordInterceptor implements OutputRecordInterceptor {
38+
private final OpenTelemetry openTelemetry;
39+
40+
private final KafkaTextMapSetter kafkaTextMapSetter;
41+
42+
@Inject
43+
public TracingOutputRecordInterceptor(OpenTelemetry openTelemetry, KafkaTextMapSetter kafkaTextMapSetter) {
44+
this.openTelemetry = openTelemetry;
45+
this.kafkaTextMapSetter = kafkaTextMapSetter;
46+
}
47+
48+
@Override
49+
public Record interceptOutputRecord(Record record) {
50+
openTelemetry.getPropagators().getTextMapPropagator().fields().forEach(record.headers()::remove);
51+
openTelemetry.getPropagators()
52+
.getTextMapPropagator()
53+
.inject(Context.current(), record.headers(), kafkaTextMapSetter);
54+
return record;
55+
}
56+
57+
@Override
58+
public int priority() {
59+
return OutputRecordInterceptorPriorities.TRACING;
60+
}
61+
}

0 commit comments

Comments
 (0)