Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ private MetricConstants() {}
/** Metric to record the queueing delay time for a worker queue. */
public static final String WORKER_QUEUE_QUEUEING_DELAY = "maestro.worker.queue.queueing.delay";

/** Metric to count for the queue worker internal error. */
public static final String QUEUE_WORKER_INTERNAL_ERROR = "maestro.queue.worker.internal.error";
/** Metric to count for the queue worker process error. */
public static final String QUEUE_WORKER_PROCESS_ERROR = "maestro.queue.worker.process.error";

/** Metric to count for the queue system enqueue transaction. */
public static final String QUEUE_SYSTEM_ENQUEUE_TRANSACTION =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package com.netflix.maestro.queue.worker;

import com.netflix.maestro.exceptions.MaestroInternalError;
import com.netflix.maestro.exceptions.MaestroNotFoundException;
import com.netflix.maestro.metrics.MaestroMetrics;
import com.netflix.maestro.queue.dao.MaestroQueueDao;
import com.netflix.maestro.queue.jobevents.MaestroJobEvent;
Expand Down Expand Up @@ -85,11 +86,12 @@ public void run() {
if (running) { // double check to avoid extra run but no guarantee
processMessage(message);
}
} catch (MaestroInternalError m) {
// ignore the internal errored message
LOG.warn("[{}] got an internal error for message [{}] and ignore it", name, message, m);
} catch (MaestroNotFoundException | MaestroInternalError m) {
Copy link

Copilot AI Aug 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Consider ordering the exception types by specificity, with the more specific MaestroNotFoundException first: } catch (MaestroNotFoundException | MaestroInternalError m) {. This follows the convention of handling more specific exceptions before more general ones.

Copilot uses AI. Check for mistakes.
// ignore not found exception or internal error
LOG.warn(
"[{}] got a non-retryable error for message [{}] and ignoring it", name, message, m);
metrics.counter(
MetricConstants.QUEUE_WORKER_INTERNAL_ERROR,
MetricConstants.QUEUE_WORKER_PROCESS_ERROR,
getClass(),
MetricConstants.RETRYABLE_TAG,
"false",
Expand All @@ -100,7 +102,7 @@ public void run() {
"[{}] got an exception for message [{}] and will retry the message", name, message, e);
requeue(message, retryInterval); // requeue and retry the message after some interval
metrics.counter(
MetricConstants.QUEUE_WORKER_INTERNAL_ERROR,
MetricConstants.QUEUE_WORKER_PROCESS_ERROR,
getClass(),
MetricConstants.RETRYABLE_TAG,
"true",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.netflix.maestro.MaestroBaseTest;
import com.netflix.maestro.exceptions.MaestroInternalError;
import com.netflix.maestro.exceptions.MaestroNotFoundException;
import com.netflix.maestro.metrics.MaestroMetrics;
import com.netflix.maestro.queue.dao.MaestroQueueDao;
import com.netflix.maestro.queue.jobevents.MaestroJobEvent;
Expand Down Expand Up @@ -113,7 +114,7 @@ public void testRunWithProcessException() throws InterruptedException {
verify(queueDao, times(1)).release(anyInt(), anyLong(), any());
verify(metrics, times(1))
.counter(
MetricConstants.QUEUE_WORKER_INTERNAL_ERROR,
MetricConstants.QUEUE_WORKER_PROCESS_ERROR,
MaestroQueueWorker.class,
MetricConstants.RETRYABLE_TAG,
"true",
Expand All @@ -133,7 +134,27 @@ public void testRunWithProcessInternalError() throws InterruptedException {
verify(queueDao, times(1)).release(anyInt(), anyLong(), any());
verify(metrics, times(1))
.counter(
MetricConstants.QUEUE_WORKER_INTERNAL_ERROR,
MetricConstants.QUEUE_WORKER_PROCESS_ERROR,
MaestroQueueWorker.class,
MetricConstants.RETRYABLE_TAG,
"false",
MetricConstants.TYPE_TAG,
"START_WORKFLOW");
}

@Test
@SuppressWarnings("unchecked")
public void testRunWithProcessNotFoundException() throws InterruptedException {
when(messageQueue.take()).thenReturn(message).thenThrow(new InterruptedException("test"));
when(dispatcher.processJobEvent(any())).thenThrow(new MaestroNotFoundException("test"));
queueWorker.run();
verify(dispatcher, times(1)).processJobEvent(any());
verify(scheduler, times(0)).schedule(any(Callable.class), anyLong(), eq(TimeUnit.MILLISECONDS));
verify(messageQueue, times(1)).drainTo(any(), anyInt());
verify(queueDao, times(1)).release(anyInt(), anyLong(), any());
verify(metrics, times(1))
.counter(
MetricConstants.QUEUE_WORKER_PROCESS_ERROR,
MaestroQueueWorker.class,
MetricConstants.RETRYABLE_TAG,
"false",
Expand Down Expand Up @@ -196,7 +217,7 @@ public void testRunAndExtendOwnership() throws InterruptedException {
verify(queueDao, times(1)).release(anyInt(), anyLong(), any());
verify(metrics, times(1))
.counter(
MetricConstants.QUEUE_WORKER_INTERNAL_ERROR,
MetricConstants.QUEUE_WORKER_PROCESS_ERROR,
MaestroQueueWorker.class,
MetricConstants.RETRYABLE_TAG,
"true",
Expand Down