Skip to content

Commit

Permalink
[#1356] feat(server): improve expired buffers metric and log (#1469)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

improve expired buffers metric and log

Once the expired buffer happened, it means the server or client may have problems, 
we should find out the related apps by the log shown in the server

### Why are the changes needed?

For #1356 

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests
  • Loading branch information
zuston authored Jan 23, 2024
1 parent ca0f0ac commit ba25785
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ public class ShuffleServerMetrics {
private static final String LOCAL_FILE_EVENT_FLUSH_NUM = "local_file_event_flush_num";
private static final String HADOOP_EVENT_FLUSH_NUM = "hadoop_event_flush_num";

private static final String TOTAL_EXPIRED_PRE_ALLOCATED_BUFFER_NUM =
"total_expired_preAllocated_buffer_num";

private static final String TOTAL_REMOVE_RESOURCE_TIME = "total_remove_resource_time";
private static final String TOTAL_REMOVE_RESOURCE_BY_SHUFFLE_IDS_TIME =
"total_remove_resource_by_shuffle_ids_time";
Expand Down Expand Up @@ -205,6 +208,7 @@ public class ShuffleServerMetrics {
private static String tags;
public static Counter counterLocalFileEventFlush;
public static Counter counterHadoopEventFlush;
public static Counter counterPreAllocatedBufferExpired;

private static MetricsManager metricsManager;
private static boolean isRegister = false;
Expand Down Expand Up @@ -394,6 +398,9 @@ private static void setUpMetrics() {
counterLocalFileEventFlush = metricsManager.addCounter(LOCAL_FILE_EVENT_FLUSH_NUM);
counterHadoopEventFlush = metricsManager.addCounter(HADOOP_EVENT_FLUSH_NUM);

counterPreAllocatedBufferExpired =
metricsManager.addCounter(TOTAL_EXPIRED_PRE_ALLOCATED_BUFFER_NUM);

summaryTotalRemoveResourceTime = metricsManager.addSummary(TOTAL_REMOVE_RESOURCE_TIME);
summaryTotalRemoveResourceByShuffleIdsTime =
metricsManager.addSummary(TOTAL_REMOVE_RESOURCE_BY_SHUFFLE_IDS_TIME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,22 +478,27 @@ public long requireBuffer(
throw new NoBufferForHugePartitionException(errorMessage);
}
}
return requireBuffer(requireSize);
return requireBuffer(appId, requireSize);
}

public long requireBuffer(int requireSize) {
public long requireBuffer(String appId, int requireSize) {
if (shuffleBufferManager.requireMemory(requireSize, true)) {
long requireId = requireBufferId.incrementAndGet();
requireBufferIds.put(
requireId,
new PreAllocatedBufferInfo(requireId, System.currentTimeMillis(), requireSize));
new PreAllocatedBufferInfo(appId, requireId, System.currentTimeMillis(), requireSize));
return requireId;
} else {
LOG.error("Failed to require buffer, require size: {}", requireSize);
throw new NoBufferException("No Buffer For Regular Partition, requireSize: " + requireSize);
}
}

public long requireBuffer(int requireSize) {
// appId of EMPTY means the client uses the old version that should be upgraded.
return requireBuffer("EMPTY", requireSize);
}

public byte[] getFinishedBlockIds(String appId, Integer shuffleId, Set<Integer> partitions)
throws IOException {
refreshAppId(appId);
Expand Down Expand Up @@ -781,9 +786,13 @@ private void preAllocatedBufferCheck() {
// move release memory code down to here as the requiredBuffer could be consumed during
// removing processing.
shuffleBufferManager.releaseMemory(info.getRequireSize(), false, true);
LOG.info("Remove expired preAllocatedBuffer " + requireId);
LOG.warn(
"Remove expired preAllocatedBuffer[id={}] that required by app: {}",
requireId,
info.getAppId());
ShuffleServerMetrics.counterPreAllocatedBufferExpired.inc();
} else {
LOG.info("PreAllocatedBuffer[id={}] has already been removed", requireId);
LOG.info("PreAllocatedBuffer[id={}] has already be used", requireId);
}
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
package org.apache.uniffle.server.buffer;

public class PreAllocatedBufferInfo {

private String appId;
private long requireId;
private long timestamp;
private int requireSize;

public PreAllocatedBufferInfo(long requireId, long timestamp, int requireSize) {
public PreAllocatedBufferInfo(String appId, long requireId, long timestamp, int requireSize) {
this.appId = appId;
this.requireId = requireId;
this.timestamp = timestamp;
this.requireSize = requireSize;
Expand All @@ -40,4 +41,8 @@ public long getTimestamp() {
public int getRequireSize() {
return requireSize;
}

public String getAppId() {
return appId;
}
}

0 comments on commit ba25785

Please sign in to comment.