From 8590725888f5492cf0f259922b60e287ed0a7a5c Mon Sep 17 00:00:00 2001 From: summaryzb Date: Sun, 22 Oct 2023 23:33:36 +0800 Subject: [PATCH] [#1252] fix(server): Incorrect storage write fail metric --- .../uniffle/server/ShuffleFlushManager.java | 80 ++++++++++++------- 1 file changed, 49 insertions(+), 31 deletions(-) diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java index 036ad69d09..74280307a6 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java @@ -83,36 +83,37 @@ public void addToFlushQueue(ShuffleDataFlushEvent event) { eventHandler.handle(event); } + private void recordFinalFail(ShuffleDataFlushEvent event, long start) { + LOG.error( + "Failed to write data for {} in {} times, shuffle data will be lost", event, retryMax); + if (event.getUnderStorage() != null) { + ShuffleServerMetrics.incStorageFailedCounter(event.getUnderStorage().getStorageHost()); + } + event.doCleanup(); + if (shuffleServer != null) { + long duration = System.currentTimeMillis() - start; + ShuffleServerMetrics.counterTotalFailedWrittenEventNum.inc(); + LOG.error( + "Flush to file for {} failed in {} ms and release {} bytes", + event, + duration, + event.getSize()); + } + } + public void processEvent(ShuffleDataFlushEvent event) { try { ShuffleServerMetrics.gaugeWriteHandler.inc(); long start = System.currentTimeMillis(); - boolean writeSuccess = flushToFile(event); - if (writeSuccess || event.getRetryTimes() > retryMax) { - if (event.getRetryTimes() > retryMax) { - LOG.error( - "Failed to write data for {} in {} times, shuffle data will be lost", - event, - retryMax); - if (event.getUnderStorage() != null) { - ShuffleServerMetrics.incStorageFailedCounter(event.getUnderStorage().getStorageHost()); - } - } + boolean writeSuccess = flushToFile(event, start); + // we should not call any + if (writeSuccess) { event.doCleanup(); if (shuffleServer != null) { - long duration = System.currentTimeMillis() - start; - if (writeSuccess) { - if (LOG.isDebugEnabled()) { - LOG.debug( - "Flush to file success in {} ms and release {} bytes", duration, event.getSize()); - } - } else { - ShuffleServerMetrics.counterTotalFailedWrittenEventNum.inc(); - LOG.error( - "Flush to file for {} failed in {} ms and release {} bytes", - event, - duration, - event.getSize()); + if (LOG.isDebugEnabled()) { + long duration = System.currentTimeMillis() - start; + LOG.debug( + "Flush to file success in {} ms and release {} bytes", duration, event.getSize()); } } } @@ -124,7 +125,11 @@ public void processEvent(ShuffleDataFlushEvent event) { } } - private boolean flushToFile(ShuffleDataFlushEvent event) { + private boolean reachRetryMax(ShuffleDataFlushEvent event) { + return event.getRetryTimes() > retryMax; + } + + private boolean flushToFile(ShuffleDataFlushEvent event, long start) { boolean writeSuccess = false; try { @@ -160,7 +165,7 @@ private boolean flushToFile(ShuffleDataFlushEvent event) { if (!storage.canWrite()) { // todo: Could we add an interface supportPending for storageManager // to unify following logic of multiple different storage managers - if (event.getRetryTimes() <= retryMax) { + if (!reachRetryMax(event)) { if (event.isPended()) { LOG.error( "Drop this event directly due to already having entered pending queue. event: {}", @@ -168,9 +173,15 @@ private boolean flushToFile(ShuffleDataFlushEvent event) { return true; } event.increaseRetryTimes(); - ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost()); event.markPended(); - eventHandler.handle(event); + if (!reachRetryMax(event)) { + ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost()); + eventHandler.handle(event); + } else { + recordFinalFail(event, start); + } + } else { + recordFinalFail(event, start); } return false; } @@ -198,21 +209,28 @@ private boolean flushToFile(ShuffleDataFlushEvent event) { if (writeSuccess) { updateCommittedBlockIds(event.getAppId(), event.getShuffleId(), blocks); ShuffleServerMetrics.incStorageSuccessCounter(storage.getStorageHost()); - } else if (event.getRetryTimes() <= retryMax) { + } else if (!reachRetryMax(event)) { if (event.isPended()) { LOG.error( "Drop this event directly due to already having entered pending queue. event: {}", event); } event.increaseRetryTimes(); - ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost()); event.markPended(); - eventHandler.handle(event); + if (!reachRetryMax(event)) { + ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost()); + eventHandler.handle(event); + } else { + recordFinalFail(event, start); + } } } catch (Throwable throwable) { // just log the error, don't throw the exception and stop the flush thread LOG.error("Exception happened when process flush shuffle data for {}", event, throwable); event.increaseRetryTimes(); + if (reachRetryMax(event)) { + recordFinalFail(event, start); + } } return writeSuccess; }