Skip to content

Commit

Permalink
[apache#1252] fix(server): Incorrect storage write fail metric
Browse files Browse the repository at this point in the history
  • Loading branch information
summaryzb committed Oct 22, 2023
1 parent 4f56e59 commit 8590725
Showing 1 changed file with 49 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,36 +83,37 @@ public void addToFlushQueue(ShuffleDataFlushEvent event) {
eventHandler.handle(event);
}

private void recordFinalFail(ShuffleDataFlushEvent event, long start) {
LOG.error(
"Failed to write data for {} in {} times, shuffle data will be lost", event, retryMax);
if (event.getUnderStorage() != null) {
ShuffleServerMetrics.incStorageFailedCounter(event.getUnderStorage().getStorageHost());
}
event.doCleanup();
if (shuffleServer != null) {
long duration = System.currentTimeMillis() - start;
ShuffleServerMetrics.counterTotalFailedWrittenEventNum.inc();
LOG.error(
"Flush to file for {} failed in {} ms and release {} bytes",
event,
duration,
event.getSize());
}
}

public void processEvent(ShuffleDataFlushEvent event) {
try {
ShuffleServerMetrics.gaugeWriteHandler.inc();
long start = System.currentTimeMillis();
boolean writeSuccess = flushToFile(event);
if (writeSuccess || event.getRetryTimes() > retryMax) {
if (event.getRetryTimes() > retryMax) {
LOG.error(
"Failed to write data for {} in {} times, shuffle data will be lost",
event,
retryMax);
if (event.getUnderStorage() != null) {
ShuffleServerMetrics.incStorageFailedCounter(event.getUnderStorage().getStorageHost());
}
}
boolean writeSuccess = flushToFile(event, start);
// we should not call any
if (writeSuccess) {
event.doCleanup();
if (shuffleServer != null) {
long duration = System.currentTimeMillis() - start;
if (writeSuccess) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Flush to file success in {} ms and release {} bytes", duration, event.getSize());
}
} else {
ShuffleServerMetrics.counterTotalFailedWrittenEventNum.inc();
LOG.error(
"Flush to file for {} failed in {} ms and release {} bytes",
event,
duration,
event.getSize());
if (LOG.isDebugEnabled()) {
long duration = System.currentTimeMillis() - start;
LOG.debug(
"Flush to file success in {} ms and release {} bytes", duration, event.getSize());
}
}
}
Expand All @@ -124,7 +125,11 @@ public void processEvent(ShuffleDataFlushEvent event) {
}
}

private boolean flushToFile(ShuffleDataFlushEvent event) {
private boolean reachRetryMax(ShuffleDataFlushEvent event) {
return event.getRetryTimes() > retryMax;
}

private boolean flushToFile(ShuffleDataFlushEvent event, long start) {
boolean writeSuccess = false;

try {
Expand Down Expand Up @@ -160,17 +165,23 @@ private boolean flushToFile(ShuffleDataFlushEvent event) {
if (!storage.canWrite()) {
// todo: Could we add an interface supportPending for storageManager
// to unify following logic of multiple different storage managers
if (event.getRetryTimes() <= retryMax) {
if (!reachRetryMax(event)) {
if (event.isPended()) {
LOG.error(
"Drop this event directly due to already having entered pending queue. event: {}",
event);
return true;
}
event.increaseRetryTimes();
ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
event.markPended();
eventHandler.handle(event);
if (!reachRetryMax(event)) {
ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
eventHandler.handle(event);
} else {
recordFinalFail(event, start);
}
} else {
recordFinalFail(event, start);
}
return false;
}
Expand Down Expand Up @@ -198,21 +209,28 @@ private boolean flushToFile(ShuffleDataFlushEvent event) {
if (writeSuccess) {
updateCommittedBlockIds(event.getAppId(), event.getShuffleId(), blocks);
ShuffleServerMetrics.incStorageSuccessCounter(storage.getStorageHost());
} else if (event.getRetryTimes() <= retryMax) {
} else if (!reachRetryMax(event)) {
if (event.isPended()) {
LOG.error(
"Drop this event directly due to already having entered pending queue. event: {}",
event);
}
event.increaseRetryTimes();
ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
event.markPended();
eventHandler.handle(event);
if (!reachRetryMax(event)) {
ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
eventHandler.handle(event);
} else {
recordFinalFail(event, start);
}
}
} catch (Throwable throwable) {
// just log the error, don't throw the exception and stop the flush thread
LOG.error("Exception happened when process flush shuffle data for {}", event, throwable);
event.increaseRetryTimes();
if (reachRetryMax(event)) {
recordFinalFail(event, start);
}
}
return writeSuccess;
}
Expand Down

0 comments on commit 8590725

Please sign in to comment.