Skip to content

Deprecate (Reactive)PulsarListenerEndpointAdapter #481

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ private void configureEndpoint(AbstractReactivePulsarListenerEndpoint<T> aplEndp

@Override
public DefaultReactivePulsarMessageListenerContainer<T> createContainer(String... topics) {
ReactivePulsarListenerEndpoint<T> endpoint = new ReactivePulsarListenerEndpointAdapter<>() {
ReactivePulsarListenerEndpoint<T> endpoint = new ReactivePulsarListenerEndpoint<>() {

@Override
public List<String> getTopics() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,17 @@
* @param <T> Message payload type.
* @author Christophe Bornet
* @author Chris Bono
* @author Vedran Pavic
*/
public interface ReactivePulsarListenerEndpoint<T> extends ListenerEndpoint<ReactivePulsarMessageListenerContainer<T>> {

boolean isFluxListener();
default boolean isFluxListener() {
return false;
}

@Nullable
Boolean getUseKeyOrderedProcessing();
default Boolean getUseKeyOrderedProcessing() {
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
*
* @param <T> Message payload type.
* @author Christophe Bornet
* @deprecated for removal in favor of {@link ReactivePulsarListenerEndpoint}
*/
@Deprecated(forRemoval = true)
public class ReactivePulsarListenerEndpointAdapter<T> implements ReactivePulsarListenerEndpoint<T> {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void setConcurrency(Integer concurrency) {

@Override
public ConcurrentPulsarMessageListenerContainer<T> createContainer(String... topics) {
PulsarListenerEndpoint endpoint = new PulsarListenerEndpointAdapter() {
PulsarListenerEndpoint endpoint = new PulsarListenerEndpoint() {

@Override
public Collection<String> getTopics() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.springframework.pulsar.config;

import java.util.Collection;
import java.util.Collections;

import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.schema.SchemaType;
Expand All @@ -32,6 +33,7 @@
*
* @param <C> Message listener container type.
* @author Christophe Bornet
* @author Vedran Pavic
*/
public interface ListenerEndpoint<C extends MessageListenerContainer> {

Expand All @@ -42,53 +44,69 @@ public interface ListenerEndpoint<C extends MessageListenerContainer> {
* @see ListenerContainerFactory#createListenerContainer
*/
@Nullable
String getId();
default String getId() {
return null;
}

/**
* Return the subscription name for this endpoint's container.
* @return the subscription name.
*/
@Nullable
String getSubscriptionName();
default String getSubscriptionName() {
return null;
}

/**
* Return the subscription type for this endpoint's container.
* @return the subscription type.
*/
@Nullable
SubscriptionType getSubscriptionType();
default SubscriptionType getSubscriptionType() {
return SubscriptionType.Exclusive;
}

/**
* Return the topics for this endpoint's container.
* @return the topics.
*/
Collection<String> getTopics();
default Collection<String> getTopics() {
return Collections.emptyList();
}

/**
* Return the topic pattern for this endpoint's container.
* @return the topic pattern.
*/
String getTopicPattern();
default String getTopicPattern() {
return null;
}

/**
* Return the autoStartup for this endpoint's container.
* @return the autoStartup.
*/
@Nullable
Boolean getAutoStartup();
default Boolean getAutoStartup() {
return null;
}

/**
* Return the schema type for this endpoint's container.
* @return the schema type.
*/
SchemaType getSchemaType();
default SchemaType getSchemaType() {
return null;
}

/**
* Return the concurrency for this endpoint's container.
* @return the concurrency.
*/
@Nullable
Integer getConcurrency();
default Integer getConcurrency() {
return null;
}

/**
* Setup the specified message listener container with the model defined by this
Expand All @@ -101,6 +119,7 @@ public interface ListenerEndpoint<C extends MessageListenerContainer> {
* @param listenerContainer the listener container to configure
* @param messageConverter the message converter - can be null
*/
void setupListenerContainer(C listenerContainer, @Nullable MessageConverter messageConverter);
default void setupListenerContainer(C listenerContainer, @Nullable MessageConverter messageConverter) {
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,20 @@
*
* @author Soby Chacko
* @author Alexander Preuß
* @author Vedran Pavic
*/
public interface PulsarListenerEndpoint extends ListenerEndpoint<PulsarMessageListenerContainer> {

boolean isBatchListener();
default boolean isBatchListener() {
return false;
}

Properties getConsumerProperties();
default Properties getConsumerProperties() {
return null;
}

AckMode getAckMode();
default AckMode getAckMode() {
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
*
* @author Soby Chacko
* @author Alexander Preuß
* @deprecated for removal in favor of {@link PulsarListenerEndpoint}
*/
@Deprecated(forRemoval = true)
public class PulsarListenerEndpointAdapter implements PulsarListenerEndpoint {

@Override
Expand Down