Skip to content

Commit

Permalink
Merge pull request #6 from oyvindstegard/master
Browse files Browse the repository at this point in the history
Lift project to Kafka clients 3.6, Spring Kafka 3.1 and Spring Boot 3.2
  • Loading branch information
V1adau authored Nov 30, 2023
2 parents ce746f1 + 9b4e162 commit d2297dc
Show file tree
Hide file tree
Showing 20 changed files with 415 additions and 410 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@ target/
/.idea/
*.iml
sequence-producer.state
dependency-reduced-pom.xml

315 changes: 163 additions & 152 deletions README.md

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions boot-app
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ set -e
cd "$(dirname "$0")"

if ! test -f messages/target/messages-*.jar -a\
-f clients-spring/target/clients-spring-*-exec.jar; then
-f clients-spring/target/clients-spring-*.jar; then
mvn -B install
fi

cd clients-spring
exec java -jar target/clients-spring-*-exec.jar "$@"
exec java -jar target/clients-spring-*.jar "$@"

36 changes: 0 additions & 36 deletions clients-spring/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,9 @@
<groupId>no.nav.kafka</groupId>
<artifactId>kafka-sandbox</artifactId>
<version>1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

<artifactId>clients-spring</artifactId>
<packaging>jar</packaging>

<properties>
<spring-boot.version>2.6.1</spring-boot.version>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
Expand Down Expand Up @@ -56,38 +38,20 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring-boot.version}</version>
<executions>
<execution>
<id>repackage</id>
<goals>
<goal>repackage</goal>
</goals>
<configuration>
<classifier>exec</classifier>
</configuration>
</execution>
</executions>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public static class AverageRatioRandom<T> implements FailureRateStrategy<T> {

public AverageRatioRandom(float failureRate) {
if (failureRate < 0 || failureRate > 1.0) {
throw new IllegalArgumentException("failure rate must be a decmial number between 0.0 and 1.0");
throw new IllegalArgumentException("failure rate must be a decimal number between 0.0 and 1.0");
}
this.failureRate = failureRate;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,9 @@ public EventStore<SensorEvent> sensorEventStore(@Value("${measurements.event-sto
public ConcurrentKafkaListenerContainerFactory<String, SensorEvent> measurementsListenerContainer(
ConsumerFactory<String, SensorEvent> consumerFactory,
Optional<CommonErrorHandler> errorHandler,
Optional<BatchErrorHandler> legacyErrorHandler, // just temporary, since we still have legacy error handlers in config
@Value("${measurements.consumer.handle-deserialization-error:true}") boolean handleDeserializationError) {

// Consumer configuration from application.yml, where we will override some properties:
// Consumer configuration from application.yml, where we will override some properties here:
Map<String, Object> externalConfigConsumerProps = new HashMap<>(consumerFactory.getConfigurationProperties());

ConcurrentKafkaListenerContainerFactory<String, SensorEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
Expand All @@ -99,11 +98,8 @@ public ConcurrentKafkaListenerContainerFactory<String, SensorEvent> measurements
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);

if (errorHandler.isPresent()) {
LOG.info("Using error handler: {}", errorHandler.get().getClass().getSimpleName());
LOG.info("Using error handler: {}", errorHandler.map(h -> h.getClass().getSimpleName()).orElse("none"));
factory.setCommonErrorHandler(errorHandler.get());
} else if (legacyErrorHandler.isPresent()) {
LOG.info("Using legacy error handler: {}", legacyErrorHandler.get().getClass().getSimpleName());
factory.setBatchErrorHandler(legacyErrorHandler.get());
} else {
LOG.info("Using Spring Kafka default error handler");
}
Expand All @@ -122,8 +118,8 @@ private DefaultKafkaConsumerFactory<String, SensorEvent> consumerFactory(
boolean handleDeserializationError) {
// override some consumer props from external config
Map<String, Object> consumerProps = new HashMap<>(externalConfigConsumerProps);
consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "spring-web-measurement");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "spring-web-measurement");
consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "boot-app-measurement");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "boot-app-measurement");

// Deserialization config
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
Expand All @@ -139,7 +135,7 @@ private DefaultKafkaConsumerFactory<String, SensorEvent> consumerFactory(
}

@Bean
@ConditionalOnProperty(value = "measurements.consumer.error-handler", havingValue = "ignore")
@ConditionalOnProperty(value = "measurements.consumer.error-handler", havingValue = "log-and-ignore")
public CommonErrorHandler ignoreHandler() {
return new CommonLoggingErrorHandler();
}
Expand All @@ -150,44 +146,28 @@ public CommonErrorHandler infiniteRetryHandler() {
return new DefaultErrorHandler(new FixedBackOff(FixedBackOff.DEFAULT_INTERVAL, FixedBackOff.UNLIMITED_ATTEMPTS));
}

// TODO: upgrade to non-deprecated common class of error handlers with similar behaviour:
@Bean
@ConditionalOnProperty(value = "measurements.consumer.error-handler", havingValue = "seek-to-current")
public BatchErrorHandler seekToCurrentHandler() {
return new SeekToCurrentBatchErrorHandler();
}

@Bean
@ConditionalOnProperty(value = "measurements.consumer.error-handler", havingValue = "seek-to-current-with-backoff")
public BatchErrorHandler seekToCurrentWithBackoffHandler() {
SeekToCurrentBatchErrorHandler handler = new SeekToCurrentBatchErrorHandler();
// For this error handler, max attempts actually does not matter
handler.setBackOff(new FixedBackOff(2000L, 2));
return handler;
}

@Bean
@ConditionalOnProperty(value = "measurements.consumer.error-handler", havingValue = "retry-with-backoff")
public BatchErrorHandler retryWithBackoffHandler() {
return new RetryingBatchErrorHandler(new FixedBackOff(2000L, 2), null);
public CommonErrorHandler retryWithBackoffHandler() {
return new DefaultErrorHandler(new FixedBackOff(2000L, 2));
}

@Bean
@ConditionalOnProperty(value = "measurements.consumer.error-handler", havingValue = "retry-with-backoff-recovery")
public BatchErrorHandler retryWithBackoffRecoveryHandler(EventStore<SensorEvent> eventStore) {
public CommonErrorHandler retryWithBackoffRecoveryHandler(EventStore<SensorEvent> eventStore) {
return new RetryingErrorHandler(eventStore);
}

@Bean
@ConditionalOnProperty(value = "measurements.consumer.error-handler", havingValue = "recovering")
public BatchErrorHandler recoveringHandler() {
public CommonErrorHandler recoveringHandler() {
return new RecoveringErrorHandler();
}

@Bean
@ConditionalOnProperty(value = "measurements.consumer.error-handler", havingValue = "stop-container")
public BatchErrorHandler containerStoppingHandler() {
return new ContainerStoppingBatchErrorHandler();
public CommonErrorHandler containerStoppingHandler() {
return new CommonContainerStoppingErrorHandler();
}

}
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
package no.nav.kafka.sandbox.measurements;

import no.nav.kafka.sandbox.data.EventStore;
import no.nav.kafka.sandbox.measurements.errorhandlers.RecoveringErrorHandler;
import no.nav.kafka.sandbox.messages.Measurements;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.BatchErrorHandler;
import org.springframework.kafka.listener.BatchListenerFailedException;
import org.springframework.kafka.support.serializer.DeserializationException;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.SerializationUtils;
import org.springframework.stereotype.Component;

import java.io.ByteArrayInputStream;
Expand Down Expand Up @@ -40,14 +38,14 @@ public class MeasurementsConsumer {

private final long slowdownMillis;

private final boolean usingRecoveringBatchErrorHandler;
private final boolean useBatchListenerFailedException;

public MeasurementsConsumer(EventStore<Measurements.SensorEvent> store,
@Value("${measurements.consumer.slowdown:0}") long slowdownMillis,
Optional<BatchErrorHandler> errorHandler) {
@Value("${measurements.consumer.useBatchListenerFailedException:false}") boolean useBatchListenerFailedException) {
this.eventStore = store;
this.slowdownMillis = slowdownMillis;
this.usingRecoveringBatchErrorHandler = errorHandler.isPresent() && errorHandler.get() instanceof RecoveringErrorHandler;
this.useBatchListenerFailedException = useBatchListenerFailedException;
}

/**
Expand All @@ -71,8 +69,8 @@ public void receive(List<ConsumerRecord<String, Measurements.SensorEvent>> recor
NullPointerException businessException = new NullPointerException("Message at "
+ record.topic() + "-" + record.partition() + ":" + record.offset() + " with key " + record.key() + " was null");

if (usingRecoveringBatchErrorHandler) {
// Communicate to recovering batch error handler which record in the batch that failed, and the root cause
if (useBatchListenerFailedException) {
// Communicate to error handler which record in the batch that failed, and the root cause
throw new BatchListenerFailedException(businessException.getMessage(), businessException, record);
} else {
// Throw raw root cause for other types of error handling
Expand All @@ -83,7 +81,8 @@ public void receive(List<ConsumerRecord<String, Measurements.SensorEvent>> recor
try {
eventStore.storeEvent(record.value());
} catch (Exception e) {
if (usingRecoveringBatchErrorHandler) {
if (useBatchListenerFailedException) {
// Communicate to error handler which record in the batch that failed, and the root cause
throw new BatchListenerFailedException(e.getMessage(), e, record);
} else {
throw e;
Expand All @@ -103,7 +102,7 @@ private boolean checkFailedDeserialization(ConsumerRecord<String, Measurements.S


private static Optional<Throwable> failedValueDeserialization(ConsumerRecord<String, Measurements.SensorEvent> record) {
Header valueDeserializationError = record.headers().lastHeader(ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER);
Header valueDeserializationError = record.headers().lastHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER);
if (valueDeserializationError != null) {
try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(valueDeserializationError.value()))){
DeserializationException dex = (DeserializationException)ois.readObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;
import java.util.Collections;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.stream.Collectors;

@RestController
public class MeasurementsRestController {
Expand All @@ -27,11 +25,12 @@ public MeasurementsRestController(EventStore<Measurements.SensorEvent> sensorEve
* @return messages from most oldest to most recent, optionally filtering by timestamp.
*/
@GetMapping(path = "/measurements/api", produces = MediaType.APPLICATION_JSON_VALUE)
public List<Measurements.SensorEvent> getMeasurements(@RequestParam(value = "after", required = false, defaultValue = "1970-01-01T00:00")
@DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime after) {
public List<Measurements.SensorEvent> getMeasurements(@RequestParam(value = "after", required = false, defaultValue = "1970-01-01T00:00Z")
@DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) OffsetDateTime after) {

return eventStore.fetchEvents().stream()
.filter(e -> e.getTimestamp().isAfter(after))
.collect(Collectors.toList());
.toList();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.kafka.listener.RecoveringBatchErrorHandler;
import org.springframework.util.backoff.FixedBackOff;

public class RecoveringErrorHandler extends RecoveringBatchErrorHandler {
/**
* This error handler does not recover anything more than exactly failed records
*/
public class RecoveringErrorHandler extends DefaultErrorHandler {

private static final Logger LOG = LoggerFactory.getLogger(RecoveringErrorHandler.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,22 @@
import no.nav.kafka.sandbox.messages.Measurements;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.kafka.listener.RetryingBatchErrorHandler;
import org.springframework.util.backoff.FixedBackOff;

import java.io.IOException;

public class RetryingErrorHandler extends RetryingBatchErrorHandler {
/**
* Error handler with access to event store, tries to recover records by writing to store, under certain
* conditions.
*/
public class RetryingErrorHandler extends DefaultErrorHandler {

private static final Logger LOG = LoggerFactory.getLogger(RetryingErrorHandler.class);

public RetryingErrorHandler(EventStore<Measurements.SensorEvent> store) {
super(new FixedBackOff(2000L, 2), (record, exception) -> {
super((record, exception) -> {
Throwable cause = exception;
if (exception instanceof ListenerExecutionFailedException) {
cause = exception.getCause();
Expand Down Expand Up @@ -51,7 +55,7 @@ public RetryingErrorHandler(EventStore<Measurements.SensorEvent> store) {
// Depending on business requirements (e.g. if not at-least-once semantics), then another strategy might
// be to skip the whole batch, let Spring commit offsets and continue with the next instead.
throw new RuntimeException("Unrecoverable batch error", cause);
});
}, new FixedBackOff(2000L, 2));
}

}
23 changes: 13 additions & 10 deletions clients-spring/src/main/resources/application.yml
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
spring:
application:
name: Kafka sandbox web
name: Kafka sandbox boot-app
# See class org.springframework.boot.autoconfigure.kafka.KafkaProperties:
kafka:
bootstrap-servers: localhost:9092
consumer:
client-id: spring-web
group-id: ${GROUPID:spring-web}
client-id: boot-app
group-id: ${GROUPID:boot-app}
properties:
spring.json.trusted.packages: no.nav.kafka.sandbox.messages
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
Expand Down Expand Up @@ -44,15 +44,18 @@ measurements:

# Select error handler:
# 'spring-default': just uses the Spring default for batch error handling (does not explicitly set an error handler).
# 'ignore': logs, but ignores all errors from consumer, implemented in Spring error handler CommonLoggingErrorHandler.
# 'log-and-ignore': logs, but ignores all errors from consumer, implemented in Spring error handler CommonLoggingErrorHandler.
# 'infinite-retry': tries failed batches an infinite number of times, with a backoff/delay between each attempt. Spring DefaultErrorHandler with a BackOff.
# 'seek-to-current': Spring SeekToCurrentBatchErrorHandler w/all defaults and no backoff (unlimited retries)
# 'seek-to-current-with-backoff': Spring SeekToCurrentBatchErrorHandler w/fixed delay and max 2 retries.
# 'retry-with-backoff': Spring RetryingBatchErrorHandler with no configured ConsumerRecordRecoverer
# 'retry-with-backoff-recovery': Spring RetryingBatchErrorHandler with custom ConsumerRecordRecoverer set, see RetryingErrorHandler.
# 'recovering': Spring RecoveringBatchErrorHandler, see RecoveringErrorHandler.
# 'stop-container': Spring ContainerStoppingBatchErrorHandler
# 'retry-with-backoff': Spring DefaultErrorHandler with 2 retry attempts
# 'retry-with-backoff-recovery': no.nav.k.s.m.e.RetryingErrorHandler with custom ConsumerRecordRecoverer set.
# 'recovering': no.nav.k.s.m.e.RecoveringErrorHandler
# 'stop-container': Spring CommonContainerStoppingErrorHandler
error-handler: spring-default

# Select whether consumer should throw BatchListenerFailedException w/cause when an internal processing failure occurs, or
# just directly throw any exception. Setting to true will allow Spring to detect where a failure occured in a batch of multiple
# records.
useBatchListenerFailedException: false

# Select whether deserialization exceptions of values should be handled:
handle-deserialization-error: true
Loading

0 comments on commit d2297dc

Please sign in to comment.