Skip to content

Commit

Permalink
[#1252] fix(server): Incorrect storage write fail metric (#1253)
Browse files Browse the repository at this point in the history
What changes were proposed in this pull request?
As title

Why are the changes needed?
Fix: #1252

Does this PR introduce any user-facing change?
No.

How was this patch tested?
unit test
  • Loading branch information
summaryzb authored Oct 25, 2023
1 parent ec7f85c commit 931d6cd
Showing 1 changed file with 57 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,39 +83,41 @@ public void addToFlushQueue(ShuffleDataFlushEvent event) {
eventHandler.handle(event);
}

private void recordFinalFail(ShuffleDataFlushEvent event, long start) {
LOG.error(
"Failed to write data for {} in {} times, shuffle data will be lost", event, retryMax);
if (event.getUnderStorage() != null) {
ShuffleServerMetrics.incStorageFailedCounter(event.getUnderStorage().getStorageHost());
}
event.doCleanup();
if (shuffleServer != null) {
long duration = System.currentTimeMillis() - start;
ShuffleServerMetrics.counterTotalFailedWrittenEventNum.inc();
LOG.error(
"Flush to file for {} failed in {} ms and release {} bytes",
event,
duration,
event.getSize());
}
}

private void recordSuccess(ShuffleDataFlushEvent event, long start) {
updateCommittedBlockIds(event.getAppId(), event.getShuffleId(), event.getShuffleBlocks());
ShuffleServerMetrics.incStorageSuccessCounter(event.getUnderStorage().getStorageHost());
event.doCleanup();
if (shuffleServer != null) {
if (LOG.isDebugEnabled()) {
long duration = System.currentTimeMillis() - start;
LOG.debug("Flush to file success in {} ms and release {} bytes", duration, event.getSize());
}
}
}

public void processEvent(ShuffleDataFlushEvent event) {
try {
ShuffleServerMetrics.gaugeWriteHandler.inc();
long start = System.currentTimeMillis();
boolean writeSuccess = flushToFile(event);
if (writeSuccess || event.getRetryTimes() > retryMax) {
if (event.getRetryTimes() > retryMax) {
LOG.error(
"Failed to write data for {} in {} times, shuffle data will be lost",
event,
retryMax);
if (event.getUnderStorage() != null) {
ShuffleServerMetrics.incStorageFailedCounter(event.getUnderStorage().getStorageHost());
}
}
event.doCleanup();
if (shuffleServer != null) {
long duration = System.currentTimeMillis() - start;
if (writeSuccess) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Flush to file success in {} ms and release {} bytes", duration, event.getSize());
}
} else {
ShuffleServerMetrics.counterTotalFailedWrittenEventNum.inc();
LOG.error(
"Flush to file for {} failed in {} ms and release {} bytes",
event,
duration,
event.getSize());
}
}
}
flushToFile(event);
// for thread safety we should not use or change any event info when write to file is failed
} catch (Exception e) {
LOG.error("Exception happened when flush data for " + event, e);
} finally {
Expand All @@ -124,7 +126,12 @@ public void processEvent(ShuffleDataFlushEvent event) {
}
}

private boolean reachRetryMax(ShuffleDataFlushEvent event) {
return event.getRetryTimes() > retryMax;
}

private boolean flushToFile(ShuffleDataFlushEvent event) {
long start = System.currentTimeMillis();
boolean writeSuccess = false;

try {
Expand Down Expand Up @@ -160,17 +167,23 @@ private boolean flushToFile(ShuffleDataFlushEvent event) {
if (!storage.canWrite()) {
// todo: Could we add an interface supportPending for storageManager
// to unify following logic of multiple different storage managers
if (event.getRetryTimes() <= retryMax) {
if (!reachRetryMax(event)) {
if (event.isPended()) {
LOG.error(
"Drop this event directly due to already having entered pending queue. event: {}",
event);
return true;
}
event.increaseRetryTimes();
ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
event.markPended();
eventHandler.handle(event);
if (!reachRetryMax(event)) {
ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
eventHandler.handle(event);
} else {
recordFinalFail(event, start);
}
} else {
recordFinalFail(event, start);
}
return false;
}
Expand All @@ -196,23 +209,29 @@ private boolean flushToFile(ShuffleDataFlushEvent event) {
ShuffleWriteHandler handler = storage.getOrCreateWriteHandler(request);
writeSuccess = storageManager.write(storage, handler, event);
if (writeSuccess) {
updateCommittedBlockIds(event.getAppId(), event.getShuffleId(), blocks);
ShuffleServerMetrics.incStorageSuccessCounter(storage.getStorageHost());
} else if (event.getRetryTimes() <= retryMax) {
recordSuccess(event, start);
} else if (!reachRetryMax(event)) {
if (event.isPended()) {
LOG.error(
"Drop this event directly due to already having entered pending queue. event: {}",
event);
}
event.increaseRetryTimes();
ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
event.markPended();
eventHandler.handle(event);
if (!reachRetryMax(event)) {
ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
eventHandler.handle(event);
} else {
recordFinalFail(event, start);
}
}
} catch (Throwable throwable) {
// just log the error, don't throw the exception and stop the flush thread
LOG.error("Exception happened when process flush shuffle data for {}", event, throwable);
event.increaseRetryTimes();
if (reachRetryMax(event)) {
recordFinalFail(event, start);
}
}
return writeSuccess;
}
Expand Down

0 comments on commit 931d6cd

Please sign in to comment.