Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 0 additions & 40 deletions binders/kafka-binder/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -123,46 +123,6 @@
</build>
</profile>
</profiles>
<repositories>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/libs-snapshot-local</url>
</repository>
<repository>
<id>spring-milestones</id>
<name>Spring milestones</name>
<url>https://repo.spring.io/libs-milestone-local</url>
</repository>
<repository>
<id>spring-releases</id>
<name>Spring Releases</name>
<url>https://repo.spring.io/release</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/snapshot</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</pluginRepository>
<pluginRepository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
<pluginRepository>
<id>spring-releases</id>
<name>Spring Releases</name>
<url>https://repo.spring.io/release</url>
</pluginRepository>
</pluginRepositories>
<reporting>
<plugins>
<plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.springframework.cloud.stream.binder.kafka.provisioning;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -70,12 +71,12 @@
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningException;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
import org.springframework.core.retry.RetryException;
import org.springframework.core.retry.RetryPolicy;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.retry.RetryOperations;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.core.retry.RetryOperations;
import org.springframework.core.retry.RetryTemplate;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
Expand All @@ -94,6 +95,7 @@
* @author Omer Celik
* @author Byungjun You
* @author Roman Akentev
* @author Artem Bilan
*/
public class KafkaTopicProvisioner implements
// @checkstyle:off
Expand Down Expand Up @@ -211,18 +213,13 @@ public void setMetadataRetryOperations(RetryOperations metadataRetryOperations)
@Override
public void afterPropertiesSet() {
if (this.metadataRetryOperations == null) {
RetryTemplate retryTemplate = new RetryTemplate();

SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
simpleRetryPolicy.setMaxAttempts(10);
retryTemplate.setRetryPolicy(simpleRetryPolicy);

ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(100);
backOffPolicy.setMultiplier(2);
backOffPolicy.setMaxInterval(1000);
retryTemplate.setBackOffPolicy(backOffPolicy);
this.metadataRetryOperations = retryTemplate;
RetryPolicy retryPolicy = RetryPolicy.builder()
.maxAttempts(10)
.delay(Duration.ofMillis(100))
.multiplier(2)
.maxDelay(Duration.ofSeconds(1))
.build();
this.metadataRetryOperations = new RetryTemplate(retryPolicy);
}
}

Expand Down Expand Up @@ -311,21 +308,21 @@ private int getPartitionsForTopic(String topicName, AdminClient adminClient) {
}

private Map<String, TopicDescription> retrieveTopicDescriptions(String topicName, AdminClient adminClient) {
return this.metadataRetryOperations.execute(context -> {
try {
try {
return this.metadataRetryOperations.execute(() -> {

if (logger.isDebugEnabled()) {
logger.debug("Attempting to retrieve the description for the topic: " + topicName);
}
DescribeTopicsResult describeTopicsResult = adminClient
.describeTopics(Collections.singletonList(topicName));
KafkaFuture<Map<String, TopicDescription>> all = describeTopicsResult
.allTopicNames();
KafkaFuture<Map<String, TopicDescription>> all = describeTopicsResult.allTopicNames();
return all.get(this.operationTimeout, TimeUnit.SECONDS);
}
catch (Exception ex) {
throw new ProvisioningException("Problems encountered with partitions finding for: " + topicName, ex);
}
});
});
}
catch (RetryException ex) {
throw new ProvisioningException("Problems encountered with partitions finding for: " + topicName, ex);
}
}

AdminClient createAdminClient() {
Expand Down Expand Up @@ -505,7 +502,7 @@ else if (tolerateLowerPartitionsOnBroker) {
// always consider minPartitionCount for topic creation
final int effectivePartitionCount = Math.max(
this.configurationProperties.getMinPartitionCount(), partitionCount);
this.metadataRetryOperations.execute((context) -> {
this.metadataRetryOperations.execute(() -> {

NewTopic newTopic;
Map<Integer, List<Integer>> replicasAssignments = topicProperties
Expand Down Expand Up @@ -660,7 +657,7 @@ public Collection<PartitionInfo> getPartitionsForTopic(final int partitionCount,
final boolean tolerateLowerPartitionsOnBroker,
final Callable<Collection<PartitionInfo>> callable, final String topicName) {
try {
return this.metadataRetryOperations.execute((context) -> {
return this.metadataRetryOperations.execute(() -> {
Collection<PartitionInfo> partitions = Collections.emptyList();

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsProducerProperties;
import org.springframework.core.retry.RetryTemplate;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.StringUtils;

/**
Expand Down Expand Up @@ -75,7 +75,6 @@ public GlobalKTableBinder(
}

@Override
@SuppressWarnings("unchecked")
protected Binding<GlobalKTable<Object, Object>> doBindConsumer(String name,
String group, GlobalKTable<Object, Object> inputTarget,
ExtendedConsumerProperties<KafkaStreamsConsumerProperties> properties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.springframework.cloud.stream.binder.kafka.streams;

import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -38,23 +39,24 @@
import org.apache.kafka.streams.state.QueryableStoreType;

import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.backoff.FixedBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.core.retry.RetryException;
import org.springframework.core.retry.RetryPolicy;
import org.springframework.core.retry.RetryTemplate;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;

/**
* Services pertinent to the interactive query capabilities of Kafka Streams. This class
* provides services such as querying for a particular store, which instance is hosting a
* particular store etc. This is part of the public API of the kafka streams binder and
* particular store etc. This is part of the public API of the kafka streams binder, and
* the users can inject this service in their applications to make use of it.
*
* @author Soby Chacko
* @author Renwei Han
* @author Serhii Siryi
* @author Nico Pommerening
* @author Chris Bono
* @author Artem Bilan
* @since 2.1.0
*/
public class InteractiveQueryService {
Expand Down Expand Up @@ -100,58 +102,64 @@ public <T> T getQueryableStore(String storeName, QueryableStoreType<T> storeType

AtomicReference<StoreQueryParameters<T>> storeQueryParametersAtomicReference = new AtomicReference<>(storeQueryParams);

return getRetryTemplate().execute(context -> {
T store = null;
Throwable throwable = null;
if (contextSpecificKafkaStreams != null) {
try {
store = contextSpecificKafkaStreams.store(storeQueryParametersAtomicReference.get());
}
catch (InvalidStateStoreException e) {
throwable = e;
try {
return getRetryTemplate().execute(() -> {
T store = null;
Throwable throwable = null;
if (contextSpecificKafkaStreams != null) {
try {
store = contextSpecificKafkaStreams.store(storeQueryParametersAtomicReference.get());
}
catch (InvalidStateStoreException e) {
throwable = e;
}
}
}
if (store != null) {
return store;
}
if (contextSpecificKafkaStreams != null) {
LOG.warn("Store (" + storeName + ") could not be found in Streams context, falling back to all known Streams instances");
}

// Find all apps that know about the store
Map<KafkaStreams, T> candidateStores = new HashMap<>();
for (KafkaStreams kafkaStreamApp : kafkaStreamsRegistry.getKafkaStreams()) {
try {
candidateStores.put(kafkaStreamApp, kafkaStreamApp.store(storeQueryParametersAtomicReference.get()));
if (store != null) {
return store;
}
catch (Exception ex) {
throwable = ex;
if (contextSpecificKafkaStreams != null) {
LOG.warn("Store (" + storeName + ") could not be found in Streams context, falling back to all known Streams instances");
}
}

// Store exists in a single app - no further resolution required
if (candidateStores.size() == 1) {
return candidateStores.values().stream().findFirst().get();
}

// If the store is in multiple streams apps - discard any apps that do not actually have the store
if (candidateStores.size() > 1) {

candidateStores = candidateStores.entrySet().stream()
.filter((e) -> this.topologyInfoFacade.streamsAppActuallyHasStore(e.getKey(), storeName))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
// Find all apps that know about the store
Map<KafkaStreams, T> candidateStores = new HashMap<>();
for (KafkaStreams kafkaStreamApp : kafkaStreamsRegistry.getKafkaStreams()) {
try {
candidateStores.put(kafkaStreamApp, kafkaStreamApp.store(storeQueryParametersAtomicReference.get()));
}
catch (Exception ex) {
throwable = ex;
}
}

// Store exists in a single app - no further resolution required
if (candidateStores.size() == 1) {
return candidateStores.values().stream().findFirst().get();
}

throwable = (candidateStores.size() == 0) ?
new UnknownStateStoreException("Store (" + storeName + ") not available to Streams instance") :
new InvalidStateStoreException("Store (" + storeName + ") available to more than one Streams instance");
// If the store is in multiple streams apps - discard any apps that do not actually have the store
if (candidateStores.size() > 1) {

candidateStores = candidateStores.entrySet().stream()
.filter((e) -> this.topologyInfoFacade.streamsAppActuallyHasStore(e.getKey(), storeName))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

if (candidateStores.size() == 1) {
return candidateStores.values().stream().findFirst().get();
}

}
throw new IllegalStateException("Error retrieving state store: " + storeName, throwable);
});
throwable = (candidateStores.isEmpty()) ?
new UnknownStateStoreException("Store (" + storeName + ") not available to Streams instance") :
new InvalidStateStoreException("Store (" + storeName + ") available to more than one Streams instance");

}
throw new IllegalStateException("Error retrieving state store: " + storeName, throwable);
});
}
catch (RetryException ex) {
ReflectionUtils.rethrowRuntimeException(ex.getCause());
return null;
}
}

/**
Expand Down Expand Up @@ -218,38 +226,40 @@ public HostInfo getCurrentHostInfo() {
public <K> HostInfo getHostInfo(String store, K key, Serializer<K> serializer) {
final RetryTemplate retryTemplate = getRetryTemplate();


return retryTemplate.execute(context -> {
Throwable throwable = null;
try {
final KeyQueryMetadata keyQueryMetadata = this.kafkaStreamsRegistry.getKafkaStreams()
.stream()
.map((k) -> Optional.ofNullable(k.queryMetadataForKey(store, key, serializer)))
.filter(Optional::isPresent).map(Optional::get).findFirst().orElse(null);
if (keyQueryMetadata != null) {
return keyQueryMetadata.activeHost();
try {
return retryTemplate.execute(() -> {
Throwable throwable = null;
try {
final KeyQueryMetadata keyQueryMetadata = this.kafkaStreamsRegistry.getKafkaStreams()
.stream()
.map((k) -> Optional.ofNullable(k.queryMetadataForKey(store, key, serializer)))
.filter(Optional::isPresent).map(Optional::get).findFirst().orElse(null);
if (keyQueryMetadata != null) {
return keyQueryMetadata.activeHost();
}
}
catch (Exception e) {
throwable = e;
}
}
catch (Exception e) {
throwable = e;
}
throw new IllegalStateException(
"Error when retrieving state store.", throwable != null ? throwable : new Throwable("Kafka Streams is not ready."));
});
throw new IllegalStateException(
"Error when retrieving state store.", throwable != null ? throwable : new Throwable("Kafka Streams is not ready."));
});
}
catch (RetryException ex) {
ReflectionUtils.rethrowRuntimeException(ex.getCause());
return null;
}
}

private RetryTemplate getRetryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();

KafkaStreamsBinderConfigurationProperties.StateStoreRetry stateStoreRetry = this.binderConfigurationProperties.getStateStoreRetry();
RetryPolicy retryPolicy = new SimpleRetryPolicy(stateStoreRetry.getMaxAttempts());
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(stateStoreRetry.getBackoffPeriod());

retryTemplate.setBackOffPolicy(backOffPolicy);
retryTemplate.setRetryPolicy(retryPolicy);
var stateStoreRetry = this.binderConfigurationProperties.getStateStoreRetry();
RetryPolicy retryPolicy = RetryPolicy.builder()
.maxAttempts(stateStoreRetry.getMaxAttempts())
.delay(Duration.ofMillis(stateStoreRetry.getBackoffPeriod()))
.build();

return retryTemplate;
return new RetryTemplate(retryPolicy);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsProducerProperties;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.core.retry.RetryTemplate;
import org.springframework.util.StringUtils;

/**
Expand Down
Loading
Loading