-
-
Notifications
You must be signed in to change notification settings - Fork 57
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Do not retry some exceptions #231
Changes from 11 commits
40e7813
fd312d3
2a2ccc9
b57ff96
b856cd2
1958254
bdc2343
36bbac4
116d13b
d162acd
fd924b3
7066b22
073cb8c
6bc5f48
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
/* | ||
* Copyright (c) 2020-2023 Sonu Kumar | ||
* Copyright (c) 2020-2024 Sonu Kumar | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* You may not use this file except in compliance with the License. | ||
|
@@ -35,6 +35,8 @@ | |
import com.github.sonus21.rqueue.utils.QueueThreadPool; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.Objects; | ||
import java.util.Set; | ||
import org.slf4j.LoggerFactory; | ||
import org.slf4j.event.Level; | ||
import org.springframework.messaging.Message; | ||
|
@@ -111,14 +113,14 @@ private void init() { | |
} | ||
|
||
private int getMaxRetryCount() { | ||
return job.getRqueueMessage().getRetryCount() == null | ||
return Objects.isNull(job.getRqueueMessage().getRetryCount()) | ||
? job.getQueueDetail().getNumRetry() | ||
: job.getRqueueMessage().getRetryCount(); | ||
} | ||
|
||
private void updateCounter(boolean fail) { | ||
RqueueMetricsCounter counter = beanProvider.getRqueueMetricsCounter(); | ||
if (counter == null) { | ||
if (Objects.isNull(counter)) { | ||
return; | ||
} | ||
if (fail) { | ||
|
@@ -205,8 +207,7 @@ private void updateToProcessing() { | |
this.job.updateMessageStatus(MessageStatus.PROCESSING); | ||
} | ||
|
||
private void logExecutionTimeWarning( | ||
long maxProcessingTime, long startTime, ExecutionStatus status) { | ||
private void logExecutionTimeWarning(long maxProcessingTime, long startTime) { | ||
if (System.currentTimeMillis() > maxProcessingTime) { | ||
long maxAllowedTime = maxExecutionTime(); | ||
long executionTime = System.currentTimeMillis() - startTime; | ||
|
@@ -228,11 +229,7 @@ private void begin() { | |
} | ||
|
||
private void end() { | ||
if (status == null) { | ||
job.updateExecutionStatus(ExecutionStatus.FAILED, error); | ||
} else { | ||
job.updateExecutionStatus(status, error); | ||
} | ||
job.updateExecutionStatus(status, error); | ||
} | ||
|
||
private void callMiddlewares(int currentIndex, List<Middleware> middlewares, Job job) | ||
|
@@ -252,7 +249,7 @@ private void callMiddlewares(int currentIndex, List<Middleware> middlewares, Job | |
} | ||
|
||
private void processMessage() throws Exception { | ||
if (middlewareList == null) { | ||
if (Objects.isNull(middlewareList)) { | ||
callMiddlewares(0, Collections.emptyList(), job); | ||
} else { | ||
callMiddlewares(0, middlewareList, job); | ||
|
@@ -269,14 +266,30 @@ private void execute() { | |
updateCounter(true); | ||
failureCount += 1; | ||
error = e; | ||
} catch (Exception e) { | ||
status = ExecutionStatus.FAILED; | ||
} catch (Throwable e) { | ||
updateCounter(true); | ||
failureCount += 1; | ||
error = e; | ||
status = ExecutionStatus.FAILED; | ||
log(Level.ERROR, "Message execution failed, RqueueMessage: {}", e, job.getRqueueMessage()); | ||
} | ||
} | ||
|
||
private boolean shouldRetry(long maxProcessingTime, int retryCount) { | ||
if (retryCount > 0 && | ||
ExecutionStatus.FAILED.equals(status) && | ||
System.currentTimeMillis() < maxProcessingTime) { | ||
Set<Class<? extends Throwable>> exceptions = queueDetail.getDoNotRetry(); | ||
boolean doNoRetry = Objects.nonNull(exceptions) && | ||
!exceptions.isEmpty() && | ||
Objects.nonNull(error) && | ||
exceptions.contains(error.getClass()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are you sure about this? I think error.getClass() will always be a MessagingException - its cause will be the real exception (e.g. DoNotRetry2Exception) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. However, fixing this then causes the test to BootDoNotRetryTest to fail. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok, it's a bit more complicated: If an exception is thrown in the handler, it will be wrapped by a MessagingException, and we have to check if If an exception is thrown in middleware, then it will not (necessarily) be a MessagingException. I could live with throwing (or re-throwing) only in the middleware, but I'm not sure if this is a good expectation. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see, you're right. In such cases we can check two things if it contains the actual class or it contains the cause. That should work, let me know if you think otherwise |
||
return !doNoRetry; | ||
} | ||
return false; | ||
} | ||
|
||
private void handleMessage() { | ||
long maxProcessingTime = getMaxProcessingTime(); | ||
long startTime = System.currentTimeMillis(); | ||
|
@@ -285,23 +298,22 @@ private void handleMessage() { | |
do { | ||
log(Level.DEBUG, "Attempt {} message: {}", null, attempt, job.getMessage()); | ||
begin(); | ||
if (status == null) { | ||
if (Objects.isNull(status)) { | ||
execute(); | ||
} | ||
retryCount -= 1; | ||
attempt += 1; | ||
end(); | ||
} while (retryCount > 0 && status == null && System.currentTimeMillis() < maxProcessingTime); | ||
postProcessingHandler.handle( | ||
job, (status == null ? ExecutionStatus.FAILED : status), failureCount); | ||
logExecutionTimeWarning(maxProcessingTime, startTime, status); | ||
} while (shouldRetry(maxProcessingTime, retryCount)); | ||
postProcessingHandler.handle(job, status, failureCount, error); | ||
logExecutionTimeWarning(maxProcessingTime, startTime); | ||
} | ||
|
||
private long getTtlForScheduledMessageKey(RqueueMessage message) { | ||
// Assume a message can be executing for at most 2x of their visibility timeout | ||
// due to failure in some other job same message should not be enqueued | ||
long expiryInSeconds = 2 * job.getQueueDetail().getVisibilityTimeout() / ONE_MILLI; | ||
// A message wil be processed after period, so it must stay in the system till that time | ||
// A message wil be processed after a period, so it must stay in the system till that time | ||
// how many more seconds are left to process this message | ||
long remainingTime = (message.getProcessAt() - System.currentTimeMillis()) / ONE_MILLI; | ||
if (remainingTime > 0) { | ||
|
@@ -311,8 +323,8 @@ private long getTtlForScheduledMessageKey(RqueueMessage message) { | |
} | ||
|
||
private String getScheduledMessageKey(RqueueMessage message) { | ||
// avoid duplicate message enqueue due to retry by checking the message key | ||
// avoid cross slot error by using tagged queue name in the key | ||
// avoid a duplicate message enqueue due to retry by checking the message key | ||
// avoid cross-slot error by using tagged queue name in the key | ||
// enqueuing duplicate message can lead to duplicate consumption when one job is executing task | ||
// at the same time this message was enqueued. | ||
return String.format( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't we need to check for getDoNotRetry() classes here too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need here as this is just to get the stop flag for backoff method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes we need that here too, my bad.