Skip to content

Commit

Permalink
[#1585] feat(server): Support app-level block size statistics to repo…
Browse files Browse the repository at this point in the history
…rt metrics (#1593)

### What changes were proposed in this pull request?

added shuffle block size metric of type histogram.

### Why are the changes needed?
related  feature #1585

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

new UTs.

Co-authored-by: leslizhang <leslizhang@tencent.com>
Co-authored-by: Enrico Minack <github@enrico.minack.dev>
  • Loading branch information
3 people authored Apr 30, 2024
1 parent 636d0be commit 89d2f1c
Show file tree
Hide file tree
Showing 9 changed files with 123 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,14 @@ public List<ShuffleBlockInfo> addPartitionData(

// check buffer size > spill threshold
if (usedBytes.get() - inSendListBytes.get() > spillSize) {
LOG.info(
"ShuffleBufferManager spill for buffer size exceeding spill threshold,"
+ "usedBytes[{}],inSendListBytes[{}],spillSize[{}]",
usedBytes.get(),
inSendListBytes.get(),
spillSize);
List<ShuffleBlockInfo> multiSendingBlocks = clear();

multiSendingBlocks.addAll(singleOrEmptySendingBlocks);
writeTime += System.currentTimeMillis() - start;
return multiSendingBlocks;
Expand Down Expand Up @@ -316,6 +323,8 @@ public synchronized List<ShuffleBlockInfo> clear() {
+ dataSize
+ "], memoryUsed["
+ memoryUsed
+ "], number of blocks["
+ result.size()
+ "]");
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,17 @@ static Double convertToDouble(Object o) {
return Double.parseDouble(o.toString());
}

public static double[] convertBytesStringToDoubleArray(Object o) {
if (o == null) {
return new double[0];
} else {
return Arrays.stream(o.toString().split(","))
.map(UnitConverter::byteStringAsBytes)
.mapToDouble(l -> (double) l)
.toArray();
}
}

@SuppressWarnings("unchecked")
public static List<ConfigOption<Object>> getAllConfigOptions(
Class<? extends RssBaseConf> confClass) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ public Histogram addHistogram(String name, double[] buckets, String... labels) {
return addHistogram(name, "Histogram " + name, buckets, labels);
}

public Histogram addHistogram(String name, double[] buckets) {
return addHistogram(name, "Histogram " + name, buckets, defaultLabelNames);
}

public Histogram addHistogram(String name, String help, double[] buckets, String[] labels) {
return Histogram.build()
.name(name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ private Constants() {}
// the value is used for client/server compatible, eg, online upgrade
public static final String SHUFFLE_SERVER_VERSION = "ss_v5";
public static final String METRICS_TAG_LABEL_NAME = "tags";
public static final String METRICS_APP_LABEL_NAME = "appId";
public static final String COORDINATOR_TAG = "coordinator";
public static final String SHUFFLE_DATA_FILE_SUFFIX = ".data";
public static final String SHUFFLE_INDEX_FILE_SUFFIX = ".index";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ private void registerMetrics() {
LOG.info("Register metrics");
CollectorRegistry shuffleServerCollectorRegistry = new CollectorRegistry(true);
String rawTags = getEncodedTags();
ShuffleServerMetrics.register(shuffleServerCollectorRegistry, rawTags);
ShuffleServerMetrics.register(shuffleServerCollectorRegistry, rawTags, shuffleServerConf);
grpcMetrics = new ShuffleServerGrpcMetrics(this.shuffleServerConf, rawTags);
grpcMetrics.register(new CollectorRegistry(true));
nettyMetrics = new ShuffleServerNettyMetrics(shuffleServerConf, rawTags);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,20 @@ public class ShuffleServerConf extends RssBaseConf {
.withDescription(
"keep alive time of thread pool that used for calc summary metric, in SECONDS.");

public static final ConfigOption<Boolean> APP_LEVEL_SHUFFLE_BLOCK_SIZE_METRIC_ENABLED =
ConfigOptions.key("rss.server.metrics.blockSizeStatisticsEnabled")
.booleanType()
.defaultValue(false)
.withDescription("whether or not shuffle block size metric is enabled");

public static final ConfigOption<String> APP_LEVEL_SHUFFLE_BLOCK_SIZE_METRIC_BUCKETS =
ConfigOptions.key("rss.server.metrics.blockSizeStatistics.buckets")
.stringType()
.defaultValue("32kb,64kb,128kb,256kb,512kb,1mb,2mb,4mb,8mb,16mb")
.withDescription(
"A comma-separated block size list, where each value"
+ " can be suffixed with a memory size unit, such as kb or k, mb or m, etc.");

public ShuffleServerConf() {}

public ShuffleServerConf(String fileName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,23 @@
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import io.prometheus.client.Histogram;
import io.prometheus.client.Summary;
import org.apache.commons.lang3.StringUtils;

import org.apache.uniffle.common.config.ConfigUtils;
import org.apache.uniffle.common.metrics.MetricsManager;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.storage.common.LocalStorage;

import static org.apache.uniffle.common.util.Constants.METRICS_APP_LABEL_NAME;

public class ShuffleServerMetrics {

private static final String TOTAL_RECEIVED_DATA = "total_received_data";
private static final String TOTAL_WRITE_DATA = "total_write_data";
private static final String TOTAL_WRITE_BLOCK = "total_write_block";
private static final String WRITE_BLOCK_SIZE = "write_block_size";
private static final String TOTAL_WRITE_TIME = "total_write_time";
private static final String TOTAL_WRITE_HANDLER = "total_write_handler";
private static final String TOTAL_WRITE_EXCEPTION = "total_write_exception";
Expand Down Expand Up @@ -148,6 +153,7 @@ public class ShuffleServerMetrics {
public static Counter.Child counterTotalReceivedDataSize;
public static Counter.Child counterTotalWriteDataSize;
public static Counter.Child counterTotalWriteBlockSize;
public static Histogram appHistogramWriteBlockSize;
public static Counter.Child counterTotalWriteTime;
public static Counter.Child counterWriteException;
public static Counter.Child counterWriteSlow;
Expand Down Expand Up @@ -231,20 +237,28 @@ public class ShuffleServerMetrics {
private static MetricsManager metricsManager;
private static boolean isRegister = false;

public static synchronized void register(CollectorRegistry collectorRegistry, String tags) {
public static synchronized void register(
CollectorRegistry collectorRegistry, String tags, ShuffleServerConf serverConf) {
if (!isRegister) {
ShuffleServerMetrics.tags = tags;
Map<String, String> labels = Maps.newHashMap();
labels.put(Constants.METRICS_TAG_LABEL_NAME, ShuffleServerMetrics.tags);
metricsManager = new MetricsManager(collectorRegistry, labels);
isRegister = true;
setUpMetrics();
setUpMetrics(serverConf);
}
}

public static void register(ShuffleServerConf serverConf) {
register(CollectorRegistry.defaultRegistry, Constants.SHUFFLE_SERVER_VERSION, serverConf);
}

@VisibleForTesting
public static void register() {
register(CollectorRegistry.defaultRegistry, Constants.SHUFFLE_SERVER_VERSION);
register(
CollectorRegistry.defaultRegistry,
Constants.SHUFFLE_SERVER_VERSION,
new ShuffleServerConf());
}

@VisibleForTesting
Expand Down Expand Up @@ -314,10 +328,16 @@ public static void incHadoopStorageWriteDataSize(String storageHost, long size)
incHadoopStorageWriteDataSize(storageHost, size, false);
}

private static void setUpMetrics() {
private static void setUpMetrics(ShuffleServerConf serverConf) {
counterTotalReceivedDataSize = metricsManager.addLabeledCounter(TOTAL_RECEIVED_DATA);
counterTotalWriteDataSize = metricsManager.addLabeledCounter(TOTAL_WRITE_DATA);
counterTotalWriteBlockSize = metricsManager.addLabeledCounter(TOTAL_WRITE_BLOCK);
appHistogramWriteBlockSize =
metricsManager.addHistogram(
WRITE_BLOCK_SIZE,
ConfigUtils.convertBytesStringToDoubleArray(
serverConf.get(ShuffleServerConf.APP_LEVEL_SHUFFLE_BLOCK_SIZE_METRIC_BUCKETS)),
METRICS_APP_LABEL_NAME);
counterTotalWriteTime = metricsManager.addLabeledCounter(TOTAL_WRITE_TIME);
counterWriteException = metricsManager.addLabeledCounter(TOTAL_WRITE_EXCEPTION);
counterWriteSlow = metricsManager.addLabeledCounter(TOTAL_WRITE_SLOW);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.uniffle.server.buffer;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -81,6 +82,7 @@ public class ShuffleBufferManager {
protected Map<String, Map<Integer, RangeMap<Integer, ShuffleBuffer>>> bufferPool;
// appId -> shuffleId -> shuffle size in buffer
protected Map<String, Map<Integer, AtomicLong>> shuffleSizeMap = JavaUtils.newConcurrentMap();
private final boolean appBlockSizeMetricEnabled;

public ShuffleBufferManager(
ShuffleServerConf conf, ShuffleFlushManager shuffleFlushManager, boolean nettyServerEnabled) {
Expand Down Expand Up @@ -130,6 +132,8 @@ public ShuffleBufferManager(
this.hugePartitionMemoryLimitSize =
Math.round(
capacity * conf.get(ShuffleServerConf.HUGE_PARTITION_MEMORY_USAGE_LIMITATION_RATIO));
appBlockSizeMetricEnabled =
conf.getBoolean(ShuffleServerConf.APP_LEVEL_SHUFFLE_BLOCK_SIZE_METRIC_ENABLED);
}

public void setShuffleTaskManager(ShuffleTaskManager taskManager) {
Expand Down Expand Up @@ -180,6 +184,14 @@ public StatusCode cacheShuffleData(
if (!isPreAllocated) {
updateUsedMemory(size);
}
if (appBlockSizeMetricEnabled) {
Arrays.stream(spd.getBlockList())
.forEach(
b -> {
int blockSize = b.getLength();
ShuffleServerMetrics.appHistogramWriteBlockSize.labels(appId).observe(blockSize);
});
}
updateShuffleSize(appId, shuffleId, size);
synchronized (this) {
flushSingleBufferIfNecessary(
Expand Down Expand Up @@ -337,6 +349,9 @@ public void removeBuffer(String appId) {
removeBufferByShuffleId(appId, shuffleIdToBuffers.keySet());
shuffleSizeMap.remove(appId);
bufferPool.remove(appId);
if (appBlockSizeMetricEnabled) {
ShuffleServerMetrics.appHistogramWriteBlockSize.remove(appId);
}
}

public synchronized boolean requireMemory(long size, boolean isPreAllocated) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.File;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand All @@ -28,6 +29,7 @@

import com.google.common.collect.RangeMap;
import com.google.common.util.concurrent.Uninterruptibles;
import io.prometheus.client.Collector;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -38,6 +40,7 @@
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShufflePartitionedData;
import org.apache.uniffle.common.config.ConfigUtils;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.ByteBufUtils;
import org.apache.uniffle.common.util.Constants;
Expand Down Expand Up @@ -729,4 +732,45 @@ public void flushBufferTestWhenNotSelectedStorage(@TempDir File tmpDir) throws E
assertEquals(0, shuffleBufferManager.getUsedMemory());
assertEquals(0, shuffleBufferManager.getInFlushSize());
}

@Test
public void blockSizeMetricsTest() {
String appId = "blockSizeMetricsTest";
shuffleBufferManager.setShuffleTaskManager(mockShuffleTaskManager);
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
when(mockShuffleTaskManager.getAppReadLock(appId)).thenReturn(rwLock.readLock());
when(mockShuffleServer.getShuffleTaskManager()).thenReturn(mockShuffleTaskManager);
int shuffleId = 1;
shuffleBufferManager.registerBuffer(appId, shuffleId, 0, 1);

// cache shuffle block data, and record metrics
double[] buckets =
ConfigUtils.convertBytesStringToDoubleArray(
new ShuffleServerConf()
.get(ShuffleServerConf.APP_LEVEL_SHUFFLE_BLOCK_SIZE_METRIC_BUCKETS));
Arrays.stream(buckets)
.sorted()
.forEach(
bucket -> {
StatusCode sc =
shuffleBufferManager.cacheShuffleData(
appId, shuffleId, true, createData(0, (int) bucket));
assertEquals(StatusCode.SUCCESS, sc);
});
// check metrics values
List<Collector.MetricFamilySamples> samples =
ShuffleServerMetrics.appHistogramWriteBlockSize.collect();
assertEquals(samples.size(), 1);
int index = 1;
Arrays.stream(buckets)
.sorted()
.forEach(
bucket -> {
for (Collector.MetricFamilySamples.Sample s : samples.get(0).samples) {
if (s.labelValues.contains(bucket)) {
assertEquals(s.value, index);
}
}
});
}
}

0 comments on commit 89d2f1c

Please sign in to comment.