Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,14 @@ public class RssSparkConfig {
.defaultValue(1)
.withDescription("The block retry max times when partition reassign is enabled.");

public static final ConfigOption<Boolean>
RSS_PARTITION_REASSIGN_STALE_ASSIGNMENT_FAST_SWITCH_ENABLED =
ConfigOptions.key("rss.client.reassign.staleAssignmentFastSwitchEnabled")
.booleanType()
.defaultValue(true)
.withDescription(
"Whether to fast-switch the stale shuffle server assignment when pushing shuffle data. It can be enabled when partition reassign mechanism is enabled.");

public static final ConfigOption<Boolean> RSS_CLIENT_MAP_SIDE_COMBINE_ENABLED =
ConfigOptions.key("rss.client.mapSideCombine.enabled")
.booleanType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Queues;
import com.google.common.collect.Sets;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.spark.shuffle.RssSparkConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -42,8 +44,9 @@
import org.apache.uniffle.client.response.SendShuffleDataResult;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.ThreadUtils;

/**
Expand All @@ -64,13 +67,18 @@ public class DataPusher implements Closeable {
// Must be thread safe
private final Set<String> failedTaskIds;

// Whether to fast-switch for those stale assignment to avoid backpressure.
// This is only valid if partition-reassign is enabled and single replica is used.
private final boolean staleAssignmentFastSwitchEnabled;

public DataPusher(
ShuffleWriteClient shuffleWriteClient,
Map<String, Set<Long>> taskToSuccessBlockIds,
Map<String, FailedBlockSendTracker> taskToFailedBlockSendTracker,
Set<String> failedTaskIds,
int threadPoolSize,
int threadKeepAliveTime) {
int threadKeepAliveTime,
RssConf rssConf) {
this.shuffleWriteClient = shuffleWriteClient;
this.taskToSuccessBlockIds = taskToSuccessBlockIds;
this.taskToFailedBlockSendTracker = taskToFailedBlockSendTracker;
Expand All @@ -83,6 +91,28 @@ public DataPusher(
TimeUnit.SECONDS,
Queues.newLinkedBlockingQueue(Integer.MAX_VALUE),
ThreadUtils.getThreadFactory(this.getClass().getName()));
this.staleAssignmentFastSwitchEnabled =
rssConf.get(RssClientConf.RSS_CLIENT_REASSIGN_ENABLED)
&& rssConf.get(
RssSparkConfig.RSS_PARTITION_REASSIGN_STALE_ASSIGNMENT_FAST_SWITCH_ENABLED);
}

@VisibleForTesting
public DataPusher(
ShuffleWriteClient shuffleWriteClient,
Map<String, Set<Long>> taskToSuccessBlockIds,
Map<String, FailedBlockSendTracker> taskToFailedBlockSendTracker,
Set<String> failedTaskIds,
int threadPoolSize,
int threadKeepAliveTime) {
this(
shuffleWriteClient,
taskToSuccessBlockIds,
taskToFailedBlockSendTracker,
failedTaskIds,
threadPoolSize,
threadKeepAliveTime,
new RssConf());
}

public CompletableFuture<Long> send(AddBlockEvent event) {
Expand All @@ -92,10 +122,8 @@ public CompletableFuture<Long> send(AddBlockEvent event) {
return CompletableFuture.supplyAsync(
() -> {
String taskId = event.getTaskId();
List<ShuffleBlockInfo> shuffleBlockInfoList = event.getShuffleDataInfoList();
// filter out the shuffle blocks with stale assignment
List<ShuffleBlockInfo> validBlocks =
filterOutStaleAssignmentBlocks(taskId, shuffleBlockInfoList);
List<ShuffleBlockInfo> blocks = event.getShuffleDataInfoList();
List<ShuffleBlockInfo> validBlocks = filterOutStaleAssignmentBlocks(taskId, blocks);
if (CollectionUtils.isEmpty(validBlocks)) {
return 0L;
}
Expand Down Expand Up @@ -157,6 +185,9 @@ public CompletableFuture<Long> send(AddBlockEvent event) {
*/
private List<ShuffleBlockInfo> filterOutStaleAssignmentBlocks(
String taskId, List<ShuffleBlockInfo> blocks) {
if (!staleAssignmentFastSwitchEnabled) {
return blocks;
}
FailedBlockSendTracker staleBlockTracker = new FailedBlockSendTracker();
List<ShuffleBlockInfo> validBlocks = new ArrayList<>();
for (ShuffleBlockInfo block : blocks) {
Expand All @@ -165,9 +196,11 @@ private List<ShuffleBlockInfo> filterOutStaleAssignmentBlocks(
if (servers == null || servers.size() != 1) {
validBlocks.add(block);
} else {
ShuffleServerInfo server = servers.get(0);
if (block.isStaleAssignment()) {
staleBlockTracker.add(
block, block.getShuffleServerInfos().get(0), StatusCode.INTERNAL_ERROR);
// It means the block failed due to the stale assignment fast-switch when status code is
// null.
staleBlockTracker.add(block, server, null);
} else {
validBlocks.add(block);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.google.common.annotations.VisibleForTesting;
import org.apache.spark.shuffle.RssSparkConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -49,14 +50,16 @@ public OverlappingCompressionDataPusher(
Set<String> failedTaskIds,
int threadPoolSize,
int threadKeepAliveTime,
int compressionThreads) {
int compressionThreads,
RssConf rssConf) {
super(
shuffleWriteClient,
taskToSuccessBlockIds,
taskToFailedBlockSendTracker,
failedTaskIds,
threadPoolSize,
threadKeepAliveTime);
threadKeepAliveTime,
rssConf);
if (compressionThreads <= 0) {
throw new RssException(
"Invalid rss configuration of "
Expand All @@ -69,6 +72,26 @@ public OverlappingCompressionDataPusher(
compressionThreads, ThreadUtils.getThreadFactory("compression-thread"));
}

@VisibleForTesting
public OverlappingCompressionDataPusher(
ShuffleWriteClient shuffleWriteClient,
Map<String, Set<Long>> taskToSuccessBlockIds,
Map<String, FailedBlockSendTracker> taskToFailedBlockSendTracker,
Set<String> failedTaskIds,
int threadPoolSize,
int threadKeepAliveTime,
int compressionThreads) {
this(
shuffleWriteClient,
taskToSuccessBlockIds,
taskToFailedBlockSendTracker,
failedTaskIds,
threadPoolSize,
threadKeepAliveTime,
compressionThreads,
new RssConf());
}

@Override
public CompletableFuture<Long> send(AddBlockEvent event) {
// Step 1: process event data in a separate thread (e.g., trigger compression)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,8 @@ public RssShuffleManagerBase(SparkConf conf, boolean isDriver) {
failedTaskIds,
poolSize,
keepAliveTime,
threads);
threads,
rssConf);
LOG.info(
"Using {} with {} compression threads", dataPusher.getClass().getSimpleName(), threads);
} else {
Expand All @@ -376,7 +377,8 @@ public RssShuffleManagerBase(SparkConf conf, boolean isDriver) {
taskToFailedBlockSendTracker,
failedTaskIds,
poolSize,
keepAliveTime);
keepAliveTime,
rssConf);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.spark.shuffle.RssSparkConfig;
import org.junit.jupiter.api.Test;

import org.apache.uniffle.client.common.ShuffleServerPushCostTracker;
Expand All @@ -38,6 +39,8 @@
import org.apache.uniffle.client.response.SendShuffleDataResult;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.JavaUtils;

Expand Down Expand Up @@ -97,14 +100,18 @@ public void testFilterOutStaleAssignmentBlocks() {
Map<String, FailedBlockSendTracker> taskToFailedBlockSendTracker = JavaUtils.newConcurrentMap();
Set<String> failedTaskIds = new HashSet<>();

RssConf rssConf = new RssConf();
rssConf.set(RssClientConf.RSS_CLIENT_REASSIGN_ENABLED, true);
rssConf.set(RssSparkConfig.RSS_PARTITION_REASSIGN_STALE_ASSIGNMENT_FAST_SWITCH_ENABLED, true);
DataPusher dataPusher =
new DataPusher(
shuffleWriteClient,
taskToSuccessBlockIds,
taskToFailedBlockSendTracker,
failedTaskIds,
1,
2);
2,
rssConf);
dataPusher.setRssAppId("testFilterOutStaleAssignmentBlocks");

String taskId = "taskId1";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@
import java.util.concurrent.CompletableFuture;

import com.google.common.collect.Maps;
import org.apache.spark.shuffle.RssSparkConfig;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import org.apache.uniffle.client.impl.FailedBlockSendTracker;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.util.JavaUtils;
Expand All @@ -51,6 +53,9 @@ public void testSend() {
Set<String> failedTaskIds = new HashSet<>();

RssConf rssConf = new RssConf();
rssConf.set(RssClientConf.RSS_CLIENT_REASSIGN_ENABLED, true);
rssConf.set(RssSparkConfig.RSS_PARTITION_REASSIGN_STALE_ASSIGNMENT_FAST_SWITCH_ENABLED, true);

int threads = rssConf.get(RSS_WRITE_OVERLAPPING_COMPRESSION_THREADS_PER_VCORE);

// case1: Illegal thread number of compression
Expand All @@ -64,7 +69,8 @@ public void testSend() {
failedTaskIds,
1,
2,
threads);
threads,
rssConf);
});

// case2: Propagated into the underlying data pusher
Expand All @@ -76,7 +82,8 @@ public void testSend() {
failedTaskIds,
1,
2,
1);
1,
rssConf);
pusher.setRssAppId("testSend");

String taskId = "taskId1";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -631,12 +631,18 @@ private void collectFailedBlocksToResend() {
for (Long blockId : failedBlockIds) {
List<TrackingBlockStatus> failedBlockStatus = failedTracker.getFailedBlockStatus(blockId);
synchronized (failedBlockStatus) {
int retryIndex =
int retryCnt =
failedBlockStatus.stream()
.filter(
x -> {
// If statusCode is null, the block was resent due to a stale assignment.
// In this case, the retry count checking should be ignored.
return x.getStatusCode() != null;
})
.map(x -> x.getShuffleBlockInfo().getRetryCnt())
.max(Comparator.comparing(Integer::valueOf))
.get();
if (retryIndex >= blockFailSentRetryMaxTimes) {
.orElse(-1);
if (retryCnt >= blockFailSentRetryMaxTimes) {
LOG.error(
"Partial blocks for taskId: [{}] retry exceeding the max retry times: [{}]. Fast fail! faulty server list: {}",
taskId,
Expand Down Expand Up @@ -862,7 +868,12 @@ private void reassignAndResendBlocks(Set<TrackingBlockStatus> blocks) {
// clear the previous retry state of block
clearFailedBlockState(block);
final ShuffleBlockInfo newBlock = block;
newBlock.incrRetryCnt();
// if the status code is null, it means the block is resent due to stale assignment, not
// because of the block send failure. In this case, the retry count should not be increased;
// otherwise it may cause unexpected fast failure.
if (blockStatus.getStatusCode() != null) {
newBlock.incrRetryCnt();
}
newBlock.reassignShuffleServers(Arrays.asList(replacement));
resendCandidates.add(newBlock);
}
Expand Down
Loading