Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not retry some exceptions #231

Merged
merged 14 commits into from
Jul 8, 2024
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023 Sonu Kumar
* Copyright (c) 2019-2024 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* You may not use this file except in compliance with the License.
Expand Down Expand Up @@ -177,11 +177,18 @@
String priorityGroup() default "";

/**
* The message batch size, Rqueue will try to pull batchSize message in one Redis call. By default
* it will try to pull one message but if concurrency is not reached then it will try to pull more
* than one messages.
* The message batch size, Rqueue will try to pull batchSize message in one Redis call. By
* default, it will try to pull one message, but if concurrency is not reached, then it will try to
* pull more than one message.
*
* @return batch size
*/
String batchSize() default "-1";

/**
* Exception types that are not retryable, defaults to empty.
*
* @return exceptions those will not be retried
*/
Class<? extends Throwable>[] doNotRetry() default {};
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023 Sonu Kumar
* Copyright (c) 2020-2024 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* You may not use this file except in compliance with the License.
Expand All @@ -16,18 +16,24 @@

package com.github.sonus21.rqueue.listener;

import static com.github.sonus21.rqueue.utils.Constants.ONE_MILLI;
import static com.github.sonus21.rqueue.utils.Constants.SECONDS_IN_A_MINUTE;

import com.github.sonus21.rqueue.core.RqueueBeanProvider;
import com.github.sonus21.rqueue.core.middleware.Middleware;
import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer.QueueStateMgr;
import com.github.sonus21.rqueue.utils.QueueThreadPool;
import com.github.sonus21.rqueue.utils.TimeoutUtils;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import org.slf4j.event.Level;
import org.springframework.messaging.MessageHeaders;

class DefaultRqueuePoller extends RqueueMessagePoller {

private Long lastNotAvailableAt;
private final QueueDetail queueDetail;
private final QueueThreadPool queueThreadPool;

Expand Down Expand Up @@ -70,18 +76,31 @@ void deactivate(int index, String queue, DeactivateType deactivateType) {
}
}

private void logNotAvailable() {
long maxNotAvailableDelay = 10 * SECONDS_IN_A_MINUTE * ONE_MILLI;
if (Objects.isNull(lastNotAvailableAt)) {
lastNotAvailableAt = System.currentTimeMillis();
} else if (System.currentTimeMillis() - lastNotAvailableAt > maxNotAvailableDelay) {
log(Level.ERROR, "deadlock?? frozen?? stuck?? No Threads are available in last {}",
null,
Duration.ofMillis(maxNotAvailableDelay));
}
log(Level.DEBUG, "No Threads are available sleeping {}Ms", null, pollingInterval);
}

void poll() {
if (!hasAvailableThreads(queueDetail, queueThreadPool)) {
log(Level.WARN, "No Threads are available sleeping {}Ms", null, pollingInterval);
logNotAvailable();
TimeoutUtils.sleepLog(pollingInterval, false);
} else {
super.poll(-1, queueDetail.getName(), queueDetail, queueThreadPool);
lastNotAvailableAt = null;
}
}

@Override
public void start() {
log(Level.DEBUG, "Running Queue {}", null, queueDetail.getName());
log(Level.INFO, "poll starting", null);
while (true) {
try {
if (eligibleForPolling(queueDetail.getName())) {
Expand All @@ -91,8 +110,8 @@ public void start() {
} else {
deactivate(-1, queueDetail.getName(), DeactivateType.NO_MESSAGE);
}
} catch (Exception e) {
log(Level.ERROR, "Error in poller", e);
} catch (Throwable e) {
log(Level.ERROR, "error in polling", e);
if (shouldExit()) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ private void saveMessageMetadata(Callable<Void> callable) {
void updateMessageStatus(MessageStatus messageStatus) {
setMessageStatus(messageStatus);
// We need to address these problems with message metadata
// 1. Message was deleted while executing, this means local copy is stale
// 1. The Message was deleted while executing; this means local copy is stale
// 2. Parallel update is being made [dashboard operation, periodic job (two periodic jobs can
// run in parallel due to failure)]
if (!messageStatus.isTerminalState() || getRqueueMessage().isPeriodic()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023 Sonu Kumar
* Copyright (c) 2019-2024 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* You may not use this file except in compliance with the License.
Expand Down Expand Up @@ -44,6 +44,7 @@ class MappingInformation implements Comparable<MappingInformation> {
private final Map<String, Integer> priority;
private final boolean primary;
private final int batchSize;
private final Set<Class<? extends Throwable>> doNotRetry;

@Override
public String toString() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023 Sonu Kumar
* Copyright (c) 2020-2024 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* You may not use this file except in compliance with the License.
Expand Down Expand Up @@ -61,7 +61,10 @@ class PostProcessingHandler extends PrefixLogger {
this.rqueueSystemConfigDao = rqueueSystemConfigDao;
}

void handle(JobImpl job, ExecutionStatus status, int failureCount) {
void handle(JobImpl job,
ExecutionStatus status,
int failureCount,
Throwable throwable) {
try {
switch (status) {
case QUEUE_INACTIVE:
Expand All @@ -79,7 +82,7 @@ void handle(JobImpl job, ExecutionStatus status, int failureCount) {
handleSuccessFullExecution(job, failureCount);
break;
case FAILED:
handleFailure(job, failureCount);
handleFailure(job, failureCount, throwable);
break;
default:
throw new UnknownSwitchCase(String.valueOf(status));
Expand Down Expand Up @@ -153,7 +156,7 @@ private void moveMessageToQueue(
});
}

private void moveMessageToDlq(JobImpl job, int failureCount) {
private void moveMessageToDlq(JobImpl job, int failureCount, Throwable throwable) {
log(
Level.DEBUG,
"Message {} Moved to dead letter queue: {}",
Expand Down Expand Up @@ -184,7 +187,8 @@ private void moveMessageToDlq(JobImpl job, int failureCount) {
newMessage.setFailureCount(0);
newMessage.setSourceQueueName(rqueueMessage.getQueueName());
newMessage.setSourceQueueFailureCount(failureCount);
long backOff = taskExecutionBackoff.nextBackOff(userMessage, newMessage, failureCount);
long backOff = taskExecutionBackoff.nextBackOff(userMessage, newMessage, failureCount,
throwable);
backOff =
(backOff == TaskExecutionBackOff.STOP)
? FixedTaskExecutionBackOff.DEFAULT_INTERVAL
Expand Down Expand Up @@ -256,9 +260,9 @@ private void handleSuccessFullExecution(JobImpl job, int failureCount) {
deleteMessage(job, MessageStatus.SUCCESSFUL, failureCount);
}

private void handleRetryExceededMessage(JobImpl job, int failureCount) {
private void handleRetryExceededMessage(JobImpl job, int failureCount, Throwable throwable) {
if (job.getQueueDetail().isDlqSet()) {
moveMessageToDlq(job, failureCount);
moveMessageToDlq(job, failureCount, throwable);
} else {
discardMessage(job, failureCount);
}
Expand All @@ -270,18 +274,19 @@ private int getMaxRetryCount(RqueueMessage rqueueMessage, QueueDetail queueDetai
: rqueueMessage.getRetryCount();
}

private void handleFailure(JobImpl job, int failureCount) {
private void handleFailure(JobImpl job, int failureCount, Throwable throwable) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we need to check for getDoNotRetry() classes here too?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need here as this is just to get the stop flag for backoff method

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes we need that here too, my bad.

int maxRetryCount = getMaxRetryCount(job.getRqueueMessage(), job.getQueueDetail());
if (failureCount < maxRetryCount) {
long delay =
taskExecutionBackoff.nextBackOff(job.getMessage(), job.getRqueueMessage(), failureCount);
taskExecutionBackoff.nextBackOff(job.getMessage(), job.getRqueueMessage(), failureCount,
throwable);
if (delay == TaskExecutionBackOff.STOP) {
handleRetryExceededMessage(job, failureCount);
handleRetryExceededMessage(job, failureCount, throwable);
} else {
parkMessageForRetry(job, null, failureCount, delay);
}
} else {
handleRetryExceededMessage(job, failureCount);
handleRetryExceededMessage(job, failureCount, throwable);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023 Sonu Kumar
* Copyright (c) 2019-2024 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* You may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,6 +33,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
Expand Down Expand Up @@ -65,6 +66,7 @@ public class QueueDetail extends SerializableBase {
private final int batchSize;
private Map<String, Integer> priority;
private String priorityGroup;
private Set<Class<? extends Throwable>> doNotRetry;

public boolean isDlqSet() {
return !StringUtils.isEmpty(deadLetterQueueName);
Expand Down Expand Up @@ -100,7 +102,7 @@ public QueueConfig toConfig() {
}

List<QueueDetail> expandQueueDetail(boolean addDefault, int priority) {
List<QueueDetail> queueDetails = new ArrayList<>();
List<QueueDetail> queueDetails = new ArrayList<>(1 + getPriority().size());
for (Entry<String, Integer> entry : getPriority().entrySet()) {
QueueDetail cloneQueueDetail = cloneQueueDetail(entry.getKey(), entry.getValue(), name);
queueDetails.add(cloneQueueDetail);
Expand Down Expand Up @@ -145,6 +147,7 @@ private QueueDetail cloneQueueDetail(
.priorityGroup(priorityGroup)
.concurrency(concurrency)
.priority(Collections.singletonMap(Constants.DEFAULT_PRIORITY_KEY, priority))
.doNotRetry(doNotRetry)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023 Sonu Kumar
* Copyright (c) 2020-2024 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* You may not use this file except in compliance with the License.
Expand Down Expand Up @@ -35,6 +35,8 @@
import com.github.sonus21.rqueue.utils.QueueThreadPool;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
import org.springframework.messaging.Message;
Expand Down Expand Up @@ -111,14 +113,14 @@ private void init() {
}

private int getMaxRetryCount() {
return job.getRqueueMessage().getRetryCount() == null
return Objects.isNull(job.getRqueueMessage().getRetryCount())
? job.getQueueDetail().getNumRetry()
: job.getRqueueMessage().getRetryCount();
}

private void updateCounter(boolean fail) {
RqueueMetricsCounter counter = beanProvider.getRqueueMetricsCounter();
if (counter == null) {
if (Objects.isNull(counter)) {
return;
}
if (fail) {
Expand Down Expand Up @@ -205,8 +207,7 @@ private void updateToProcessing() {
this.job.updateMessageStatus(MessageStatus.PROCESSING);
}

private void logExecutionTimeWarning(
long maxProcessingTime, long startTime, ExecutionStatus status) {
private void logExecutionTimeWarning(long maxProcessingTime, long startTime) {
if (System.currentTimeMillis() > maxProcessingTime) {
long maxAllowedTime = maxExecutionTime();
long executionTime = System.currentTimeMillis() - startTime;
Expand All @@ -228,11 +229,7 @@ private void begin() {
}

private void end() {
if (status == null) {
job.updateExecutionStatus(ExecutionStatus.FAILED, error);
} else {
job.updateExecutionStatus(status, error);
}
job.updateExecutionStatus(status, error);
}

private void callMiddlewares(int currentIndex, List<Middleware> middlewares, Job job)
Expand All @@ -252,7 +249,7 @@ private void callMiddlewares(int currentIndex, List<Middleware> middlewares, Job
}

private void processMessage() throws Exception {
if (middlewareList == null) {
if (Objects.isNull(middlewareList)) {
callMiddlewares(0, Collections.emptyList(), job);
} else {
callMiddlewares(0, middlewareList, job);
Expand All @@ -269,14 +266,30 @@ private void execute() {
updateCounter(true);
failureCount += 1;
error = e;
} catch (Exception e) {
status = ExecutionStatus.FAILED;
} catch (Throwable e) {
updateCounter(true);
failureCount += 1;
error = e;
status = ExecutionStatus.FAILED;
log(Level.ERROR, "Message execution failed, RqueueMessage: {}", e, job.getRqueueMessage());
}
}

private boolean shouldRetry(long maxProcessingTime, int retryCount) {
if (retryCount > 0 &&
ExecutionStatus.FAILED.equals(status) &&
System.currentTimeMillis() < maxProcessingTime) {
Set<Class<? extends Throwable>> exceptions = queueDetail.getDoNotRetry();
boolean doNoRetry = Objects.nonNull(exceptions) &&
!exceptions.isEmpty() &&
Objects.nonNull(error) &&
exceptions.contains(error.getClass());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure about this? I think error.getClass() will always be a MessagingException - its cause will be the real exception (e.g. DoNotRetry2Exception)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, fixing this then causes the test to BootDoNotRetryTest to fail.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, it's a bit more complicated:

If an exception is thrown in the handler, it will be wrapped by a MessagingException, and we have to check if error.getCause().getClass() is in the set of exceptions.

If an exception is thrown in middleware, then it will not (necessarily) be a MessagingException.

I could live with throwing (or re-throwing) only in the middleware, but I'm not sure if this is a good expectation.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, you're right. In such cases we can check two things if it contains the actual class or it contains the cause. That should work, let me know if you think otherwise

return !doNoRetry;
}
return false;
}

private void handleMessage() {
long maxProcessingTime = getMaxProcessingTime();
long startTime = System.currentTimeMillis();
Expand All @@ -285,23 +298,22 @@ private void handleMessage() {
do {
log(Level.DEBUG, "Attempt {} message: {}", null, attempt, job.getMessage());
begin();
if (status == null) {
if (Objects.isNull(status)) {
execute();
}
retryCount -= 1;
attempt += 1;
end();
} while (retryCount > 0 && status == null && System.currentTimeMillis() < maxProcessingTime);
postProcessingHandler.handle(
job, (status == null ? ExecutionStatus.FAILED : status), failureCount);
logExecutionTimeWarning(maxProcessingTime, startTime, status);
} while (shouldRetry(maxProcessingTime, retryCount));
postProcessingHandler.handle(job, status, failureCount, error);
logExecutionTimeWarning(maxProcessingTime, startTime);
}

private long getTtlForScheduledMessageKey(RqueueMessage message) {
// Assume a message can be executing for at most 2x of their visibility timeout
// due to failure in some other job same message should not be enqueued
long expiryInSeconds = 2 * job.getQueueDetail().getVisibilityTimeout() / ONE_MILLI;
// A message wil be processed after period, so it must stay in the system till that time
// A message wil be processed after a period, so it must stay in the system till that time
// how many more seconds are left to process this message
long remainingTime = (message.getProcessAt() - System.currentTimeMillis()) / ONE_MILLI;
if (remainingTime > 0) {
Expand All @@ -311,8 +323,8 @@ private long getTtlForScheduledMessageKey(RqueueMessage message) {
}

private String getScheduledMessageKey(RqueueMessage message) {
// avoid duplicate message enqueue due to retry by checking the message key
// avoid cross slot error by using tagged queue name in the key
// avoid a duplicate message enqueue due to retry by checking the message key
// avoid cross-slot error by using tagged queue name in the key
// enqueuing duplicate message can lead to duplicate consumption when one job is executing task
// at the same time this message was enqueued.
return String.format(
Expand Down
Loading