Skip to content

Commit

Permalink
Update handle assignment error metric (#967)
Browse files Browse the repository at this point in the history
* Update handle assignment error metric

* Address feedback
  • Loading branch information
jogrogan authored Nov 6, 2023
1 parent c082369 commit 9aac481
Showing 1 changed file with 10 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,7 @@ public void onSessionExpired() {
threadPoolExecutor.submit(() -> {
Instant start = Instant.now();
try {
getAssignmentsFuture(assignmentChangeFutures, start);
getAssignmentsFuture(assignmentChangeFutures, start, false);
} catch (Exception e) {
_log.warn("Hit exception while clearing the assignment list", e);
} finally {
Expand Down Expand Up @@ -701,7 +701,8 @@ public void onNewSession() {
_zkSessionExpired = false;
}

private void getAssignmentsFuture(List<Future<Boolean>> assignmentChangeFutures, Instant start)
private void getAssignmentsFuture(List<Future<Boolean>> assignmentChangeFutures, Instant start,
boolean isDatastreamUpdate)
throws TimeoutException, InterruptedException {
for (Future<Boolean> assignmentChangeFuture : assignmentChangeFutures) {
if (Duration.between(start, Instant.now()).compareTo(ASSIGNMENT_TIMEOUT) > 0) {
Expand All @@ -711,6 +712,8 @@ private void getAssignmentsFuture(List<Future<Boolean>> assignmentChangeFutures,
assignmentChangeFuture.get(ASSIGNMENT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
_log.warn("onAssignmentChange call threw exception", e);
EventType meter = isDatastreamUpdate ? HANDLE_DATASTREAM_CHANGE_WITH_UPDATE : HANDLE_ASSIGNMENT_CHANGE;
_metrics.updateKeyedMeter(CoordinatorMetrics.getKeyedMeter(meter), 1);
}
}
}
Expand Down Expand Up @@ -768,11 +771,11 @@ private void retryHandleAssignmentChange(boolean isDatastreamUpdate) {
"onAssignmentChange retries reached threshold of {}. Not queuing further retry on onAssignmentChange event.",
_config.getMaxAssignmentRetryCount());
queueEvent = false;
}

EventType meter = isDatastreamUpdate ? HANDLE_DATASTREAM_CHANGE_WITH_UPDATE : HANDLE_ASSIGNMENT_CHANGE;
_log.warn("Updating metric for event " + meter);
_metrics.updateKeyedMeter(CoordinatorMetrics.getKeyedMeter(meter), 1);
EventType meter = isDatastreamUpdate ? HANDLE_DATASTREAM_CHANGE_WITH_UPDATE : HANDLE_ASSIGNMENT_CHANGE;
_log.warn("Updating metric for event={}", meter);
_metrics.updateKeyedMeter(CoordinatorMetrics.getKeyedMeter(meter), 1);
}

if (queueEvent) {
_log.warn("Queuing onAssignmentChange event");
Expand Down Expand Up @@ -847,7 +850,7 @@ private void handleAssignmentChange(boolean isDatastreamUpdate) throws TimeoutEx
// Wait till all the futures are complete or timeout.
Instant start = Instant.now();
try {
getAssignmentsFuture(assignmentChangeFutures, start);
getAssignmentsFuture(assignmentChangeFutures, start, isDatastreamUpdate);
} catch (TimeoutException e) {
// if it's timeout then we will retry
_log.warn("Timeout when doing the assignment", e);
Expand Down

0 comments on commit 9aac481

Please sign in to comment.