Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added simple handler for topic's internal retryable streams #371

Merged
merged 1 commit into from
Feb 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Added simple handler for topic's internal retryable streams
  • Loading branch information
alex268 committed Feb 10, 2025
commit eabb8fbbd91b79964ebae2bf8b02aa7947ee0a49
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,6 @@ public void allTypeInfiniteAndNan() {
BigDecimal scaledInf = new BigDecimal(inf, scale);
BigDecimal scaledNan = new BigDecimal(nan, scale);

System.out.println("Nan for " + scaled + " -> " + scaledNan);

assertIsInf(scaled.newValue(scaledInf));
assertIsNegInf(scaled.newValue(scaledInf.negate()));

Expand Down
12 changes: 10 additions & 2 deletions topic/src/main/java/tech/ydb/topic/impl/GrpcStreamRetrier.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;

import org.slf4j.Logger;

Expand All @@ -29,12 +30,15 @@ public abstract class GrpcStreamRetrier {
protected final String id;
protected final AtomicBoolean isReconnecting = new AtomicBoolean(false);
protected final AtomicBoolean isStopped = new AtomicBoolean(false);
private final ScheduledExecutorService scheduler;
protected final AtomicInteger reconnectCounter = new AtomicInteger(0);

protected GrpcStreamRetrier(ScheduledExecutorService scheduler) {
private final ScheduledExecutorService scheduler;
private final BiConsumer<Status, Throwable> errorsHandler;

protected GrpcStreamRetrier(ScheduledExecutorService scheduler, BiConsumer<Status, Throwable> errorsHandler) {
this.scheduler = scheduler;
this.id = generateRandomId(ID_LENGTH);
this.errorsHandler = errorsHandler;
}

protected abstract Logger getLogger();
Expand Down Expand Up @@ -127,6 +131,10 @@ protected void onSessionClosed(Status status, Throwable th) {
}
}

if (errorsHandler != null) {
errorsHandler.accept(status, th);
}

if (!isStopped.get()) {
tryScheduleReconnect();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public abstract class ReaderImpl extends GrpcStreamRetrier {
private final String consumerName;

public ReaderImpl(TopicRpc topicRpc, ReaderSettings settings) {
super(topicRpc.getScheduler());
super(topicRpc.getScheduler(), settings.getErrorsHandler());
this.topicRpc = topicRpc;
this.settings = settings;
this.session = new ReadSessionImpl();
Expand Down
15 changes: 15 additions & 0 deletions topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;

import javax.annotation.Nullable;

import com.google.common.collect.ImmutableList;

import tech.ydb.core.Status;

/**
* @author Nikolay Perfilov
*/
Expand All @@ -19,13 +22,15 @@ public class ReaderSettings {
private final List<TopicReadSettings> topics;
private final long maxMemoryUsageBytes;
private final Executor decompressionExecutor;
private final BiConsumer<Status, Throwable> errorsHandler;

private ReaderSettings(Builder builder) {
this.consumerName = builder.consumerName;
this.readerName = builder.readerName;
this.topics = ImmutableList.copyOf(builder.topics);
this.maxMemoryUsageBytes = builder.maxMemoryUsageBytes;
this.decompressionExecutor = builder.decompressionExecutor;
this.errorsHandler = builder.errorsHandler;
}

public String getConsumerName() {
Expand All @@ -41,6 +46,10 @@ public List<TopicReadSettings> getTopics() {
return topics;
}

public BiConsumer<Status, Throwable> getErrorsHandler() {
return errorsHandler;
}

public long getMaxMemoryUsageBytes() {
return maxMemoryUsageBytes;
}
Expand All @@ -63,6 +72,7 @@ public static class Builder {
private List<TopicReadSettings> topics = new ArrayList<>();
private long maxMemoryUsageBytes = MAX_MEMORY_USAGE_BYTES_DEFAULT;
private Executor decompressionExecutor = null;
private BiConsumer<Status, Throwable> errorsHandler = null;

public Builder setConsumerName(String consumerName) {
this.consumerName = consumerName;
Expand Down Expand Up @@ -103,6 +113,11 @@ public Builder setMaxMemoryUsageBytes(long maxMemoryUsageBytes) {
return this;
}

public Builder setErrorsHandler(BiConsumer<Status, Throwable> handler) {
this.errorsHandler = handler;
return this;
}

/**
* Set executor for decompression tasks.
* If not set, default executor will be used.
Expand Down
16 changes: 15 additions & 1 deletion topic/src/main/java/tech/ydb/topic/settings/WriterSettings.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package tech.ydb.topic.settings;

import java.util.function.BiConsumer;

import tech.ydb.core.Status;
import tech.ydb.topic.description.Codec;

/**
Expand All @@ -16,6 +19,7 @@ public class WriterSettings {
private final Codec codec;
private final long maxSendBufferMemorySize;
private final int maxSendBufferMessagesCount;
private final BiConsumer<Status, Throwable> errorsHandler;

private WriterSettings(Builder builder) {
this.topicPath = builder.topicPath;
Expand All @@ -25,6 +29,7 @@ private WriterSettings(Builder builder) {
this.codec = builder.codec;
this.maxSendBufferMemorySize = builder.maxSendBufferMemorySize;
this.maxSendBufferMessagesCount = builder.maxSendBufferMessagesCount;
this.errorsHandler = builder.errorsHandler;
}

public static Builder newBuilder() {
Expand All @@ -43,6 +48,10 @@ public String getMessageGroupId() {
return messageGroupId;
}

public BiConsumer<Status, Throwable> getErrorsHandler() {
return errorsHandler;
}

public Long getPartitionId() {
return partitionId;
}
Expand Down Expand Up @@ -70,6 +79,7 @@ public static class Builder {
private Codec codec = Codec.GZIP;
private long maxSendBufferMemorySize = MAX_MEMORY_USAGE_BYTES_DEFAULT;
private int maxSendBufferMessagesCount = MAX_IN_FLIGHT_COUNT_DEFAULT;
private BiConsumer<Status, Throwable> errorsHandler = null;

/**
* Set path to a topic to write to
Expand Down Expand Up @@ -148,9 +158,13 @@ public Builder setMaxSendBufferMessagesCount(int maxMessagesCount) {
return this;
}

public Builder setErrorsHandler(BiConsumer<Status, Throwable> handler) {
this.errorsHandler = handler;
return this;
}

public WriterSettings build() {
return new WriterSettings(this);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public abstract class WriterImpl extends GrpcStreamRetrier {
private CompletableFuture<WriteAck> lastAcceptedMessageFuture;

public WriterImpl(TopicRpc topicRpc, WriterSettings settings, Executor compressionExecutor) {
super(topicRpc.getScheduler());
super(topicRpc.getScheduler(), settings.getErrorsHandler());
this.topicRpc = topicRpc;
this.settings = settings;
this.session = new WriteSessionImpl();
Expand Down
Loading