Skip to content

Commit 1d02ca4

Browse files
shuwang21otterc
authored andcommitted
[SPARK-43987][SHUFFLE] Separate finalizeShuffleMerge Processing to Dedicated Thread Pools
### What changes were proposed in this pull request? In this PR, we proposed to separate finalizeShuffleMerge processing into dedicated thread pools. 1. We introduce `ShuffleTransportContext` extend `TransportContext`. 2. We override the `initializePipeline` by adding `FinalizeHandler` when the newly added configuration `spark.shuffle.server.finalizeShuffleMergeThreads` is positive. 3. We override the `decode` within `ShuffleMessageDecoder` so that an `FINALIZE_SHUFFLE_MERGE` type `RpcRequest` will not be processed by current IO threads. We will further encapsulate it as `RpcRequestInternal`. 4. A dedicated `FinalizedHandler` will be attached to the channel pipeline, which only handles `RpcRequestInternal` type. authors: otterc shuwang21 ### Why are the changes needed? In our production environment, `finalizeShuffleMerge` processing took longer time (p90 is around 20s) than other PRC requests. This is due to `finalizeShuffleMerge` invoking IO operations like truncate and file open/close. More importantly, processing this `finalizeShuffleMerge` can block other critical lightweight messages like authentications, which can cause authentication timeout as well as fetch failures. Those timeout and fetch failures affect the stability of the Spark job executions. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? 1. We add a few related UTs. 2. We also tested internally with this patch. Under the same setting, without this patch, the shuffle server can encounter authentication timeouts thus fetch failures. With this path, `finalizeShuffleMerge` is processed by `finalizeHandler` we introduced, and authentication RPC requests are not blocked. 3. In terms of performance, we have deployed internally. We saw Shuffle fetch delays have improved. P80 is reduced by 98.1% (120 s to 2.3 s). P90 is reduced by 83% (30mins to ~5mins). P99 is reduced by by 70% (44h to 13h). Furthermore, we saw The SASL timeout exceptions are reduced by ~40%. Spark job runtime is also improved. P50 is reduced by 35.2% (164s to 107s). P80 is reduced by 50.1% (15 mins s to 7.7mins). P90 is reduced by 45.5% (37 mins to 20 mins). P99 is reduced by 10% (4.5h to 4.0h). Finally, less runtime also indicates resources saving, and we are seeing 20% resources saving. Closes #41489 from shuwang21/SPARK-43987. Lead-authored-by: Shu Wang <swang7@linkedin.com> Co-authored-by: Shu Wang <wangshu1990@gmail.com> Co-authored-by: Chandni Singh <singh.chandni@gmail.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
1 parent e38d06b commit 1d02ca4

File tree

7 files changed

+381
-4
lines changed

7 files changed

+381
-4
lines changed

common/network-common/src/main/java/org/apache/spark/network/TransportContext.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
package org.apache.spark.network;
1919

20+
import io.netty.buffer.ByteBuf;
21+
import io.netty.handler.codec.MessageToMessageDecoder;
22+
2023
import java.io.Closeable;
2124
import java.util.ArrayList;
2225
import java.util.List;
@@ -196,7 +199,7 @@ public TransportChannelHandler initializePipeline(
196199
pipeline
197200
.addLast("encoder", ENCODER)
198201
.addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder())
199-
.addLast("decoder", DECODER)
202+
.addLast("decoder", getDecoder())
200203
.addLast("idleStateHandler",
201204
new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000))
202205
// NOTE: Chunks are currently guaranteed to be returned in the order of request, but this
@@ -216,6 +219,10 @@ public TransportChannelHandler initializePipeline(
216219
}
217220
}
218221

222+
protected MessageToMessageDecoder<ByteBuf> getDecoder() {
223+
return DECODER;
224+
}
225+
219226
/**
220227
* Creates the server- and client-side handler which is used to handle both RequestMessages and
221228
* ResponseMessages. The channel is expected to have been successfully created, though certain

common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@
3535
*
3636
* The header must be a ByteBuf, while the body can be a ByteBuf or a FileRegion.
3737
*/
38-
class MessageWithHeader extends AbstractFileRegion {
38+
39+
public class MessageWithHeader extends AbstractFileRegion {
3940

4041
@Nullable private final ManagedBuffer managedBuffer;
4142
private final ByteBuf header;

common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,10 @@ public TransportResponseHandler getResponseHandler() {
184184
return responseHandler;
185185
}
186186

187+
public TransportRequestHandler getRequestHandler() {
188+
return requestHandler;
189+
}
190+
187191
@Override
188192
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
189193
transportContext.getRegisteredConnections().inc();

common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import java.util.Locale;
2121
import java.util.Properties;
2222
import java.util.concurrent.TimeUnit;
23-
23+
import com.google.common.base.Preconditions;
2424
import com.google.common.primitives.Ints;
2525
import io.netty.util.NettyRuntime;
2626

@@ -324,6 +324,35 @@ public boolean separateChunkFetchRequest() {
324324
return conf.getInt("spark.shuffle.server.chunkFetchHandlerThreadsPercent", 0) > 0;
325325
}
326326

327+
/**
328+
* Percentage of io.serverThreads used by netty to process FinalizeShuffleMerge. When the config
329+
* `spark.shuffle.server.finalizeShuffleMergeThreadsPercent` is set, shuffle server will use a
330+
* separate EventLoopGroup to process FinalizeShuffleMerge messages, which are I/O intensive and
331+
* could take long time to process due to disk contentions. The number of threads used for
332+
* handling finalizeShuffleMerge requests are percentage of io.serverThreads (if defined) else it
333+
* is a percentage of 2 * #cores.
334+
*/
335+
public int finalizeShuffleMergeHandlerThreads() {
336+
if (!this.getModuleName().equalsIgnoreCase("shuffle")) {
337+
return 0;
338+
}
339+
Preconditions.checkArgument(separateFinalizeShuffleMerge(),
340+
"Please set spark.shuffle.server.finalizeShuffleMergeThreadsPercent to a positive value");
341+
int finalizeShuffleMergeThreadsPercent =
342+
Integer.parseInt(conf.get("spark.shuffle.server.finalizeShuffleMergeThreadsPercent"));
343+
int threads =
344+
this.serverThreads() > 0 ? this.serverThreads() : 2 * NettyRuntime.availableProcessors();
345+
return (int) Math.ceil(threads * (finalizeShuffleMergeThreadsPercent / 100.0));
346+
}
347+
348+
/**
349+
* Whether to use a separate EventLoopGroup to process FinalizeShuffleMerge messages, it is
350+
* decided by the config `spark.shuffle.server.finalizeShuffleMergeThreadsPercent` is set or not.
351+
*/
352+
public boolean separateFinalizeShuffleMerge() {
353+
return conf.getInt("spark.shuffle.server.finalizeShuffleMergeThreadsPercent", 0) > 0;
354+
}
355+
327356
/**
328357
* Whether to use the old protocol while doing the shuffle block fetching.
329358
* It is only enabled while we need the compatibility in the scenario of new spark version
Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.network.shuffle;
19+
20+
import java.nio.ByteBuffer;
21+
import java.util.List;
22+
23+
import io.netty.buffer.ByteBuf;
24+
import io.netty.buffer.Unpooled;
25+
import io.netty.channel.ChannelHandlerContext;
26+
import io.netty.channel.EventLoopGroup;
27+
import io.netty.channel.SimpleChannelInboundHandler;
28+
import io.netty.channel.socket.SocketChannel;
29+
import io.netty.handler.codec.MessageToMessageDecoder;
30+
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
33+
34+
import org.apache.spark.network.TransportContext;
35+
import org.apache.spark.network.protocol.Message;
36+
import org.apache.spark.network.protocol.MessageDecoder;
37+
import org.apache.spark.network.protocol.RpcRequest;
38+
import org.apache.spark.network.server.RpcHandler;
39+
import org.apache.spark.network.server.TransportChannelHandler;
40+
import org.apache.spark.network.server.TransportRequestHandler;
41+
import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
42+
import org.apache.spark.network.util.IOMode;
43+
import org.apache.spark.network.util.NettyUtils;
44+
import org.apache.spark.network.util.TransportConf;
45+
46+
import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;
47+
48+
/**
49+
* Extends {@link TransportContext} to support customized shuffle service. Specifically, we
50+
* modified the Netty Channel Pipeline so that IO expensive messages such as FINALIZE_SHUFFLE_MERGE
51+
* are processed in the separate handlers.
52+
* */
53+
public class ShuffleTransportContext extends TransportContext {
54+
private static final Logger logger = LoggerFactory.getLogger(ShuffleTransportContext.class);
55+
private static final ShuffleMessageDecoder SHUFFLE_DECODER =
56+
new ShuffleMessageDecoder(MessageDecoder.INSTANCE);
57+
private final EventLoopGroup finalizeWorkers;
58+
59+
public ShuffleTransportContext(
60+
TransportConf conf,
61+
ExternalBlockHandler rpcHandler,
62+
boolean closeIdleConnections) {
63+
this(conf, rpcHandler, closeIdleConnections, false);
64+
}
65+
66+
public ShuffleTransportContext(TransportConf conf,
67+
RpcHandler rpcHandler,
68+
boolean closeIdleConnections,
69+
boolean isClientOnly) {
70+
super(conf, rpcHandler, closeIdleConnections, isClientOnly);
71+
72+
if ("shuffle".equalsIgnoreCase(conf.getModuleName()) && conf.separateFinalizeShuffleMerge()) {
73+
finalizeWorkers = NettyUtils.createEventLoop(
74+
IOMode.valueOf(conf.ioMode()),
75+
conf.finalizeShuffleMergeHandlerThreads(),
76+
"shuffle-finalize-merge-handler");
77+
logger.info("finalize shuffle merged workers created");
78+
} else {
79+
finalizeWorkers = null;
80+
}
81+
}
82+
83+
@Override
84+
public TransportChannelHandler initializePipeline(SocketChannel channel) {
85+
TransportChannelHandler ch = super.initializePipeline(channel);
86+
addHandlerToPipeline(channel, ch);
87+
return ch;
88+
}
89+
90+
@Override
91+
public TransportChannelHandler initializePipeline(SocketChannel channel,
92+
RpcHandler channelRpcHandler) {
93+
TransportChannelHandler ch = super.initializePipeline(channel, channelRpcHandler);
94+
addHandlerToPipeline(channel, ch);
95+
return ch;
96+
}
97+
98+
/**
99+
* Add finalize handler to pipeline if needed. This is needed only when
100+
* separateFinalizeShuffleMerge is enabled.
101+
*/
102+
private void addHandlerToPipeline(SocketChannel channel,
103+
TransportChannelHandler transportChannelHandler) {
104+
if (finalizeWorkers != null) {
105+
channel.pipeline().addLast(finalizeWorkers, FinalizedHandler.HANDLER_NAME,
106+
new FinalizedHandler(transportChannelHandler.getRequestHandler()));
107+
}
108+
}
109+
110+
@Override
111+
protected MessageToMessageDecoder<ByteBuf> getDecoder() {
112+
return finalizeWorkers == null ? super.getDecoder() : SHUFFLE_DECODER;
113+
}
114+
115+
static class ShuffleMessageDecoder extends MessageToMessageDecoder<ByteBuf> {
116+
117+
private final MessageDecoder delegate;
118+
ShuffleMessageDecoder(MessageDecoder delegate) {
119+
super();
120+
this.delegate = delegate;
121+
}
122+
123+
/**
124+
* Decode the message and check if it is a finalize merge request. If yes, then create an
125+
* internal rpc request message and add it to the list of messages to be handled by
126+
* {@link TransportChannelHandler}
127+
*/
128+
@Override
129+
protected void decode(ChannelHandlerContext channelHandlerContext,
130+
ByteBuf byteBuf,
131+
List<Object> list) throws Exception {
132+
delegate.decode(channelHandlerContext, byteBuf, list);
133+
Object msg = list.get(list.size() - 1);
134+
if (msg instanceof RpcRequest) {
135+
RpcRequest req = (RpcRequest) msg;
136+
ByteBuffer buffer = req.body().nioByteBuffer();
137+
byte type = Unpooled.wrappedBuffer(buffer).readByte();
138+
if (type == BlockTransferMessage.Type.FINALIZE_SHUFFLE_MERGE.id()) {
139+
list.remove(list.size() - 1);
140+
RpcRequestInternal rpcRequestInternal =
141+
new RpcRequestInternal(BlockTransferMessage.Type.FINALIZE_SHUFFLE_MERGE, req);
142+
logger.trace("Created internal rpc request msg with rpcId {} for finalize merge req",
143+
req.requestId);
144+
list.add(rpcRequestInternal);
145+
}
146+
}
147+
}
148+
}
149+
150+
/**
151+
* Internal message to handle rpc requests that is not accepted by
152+
* {@link TransportChannelHandler} as this message doesn't extend {@link Message}. It will be
153+
* accepted by {@link FinalizedHandler} instead, which is configured to execute in a separate
154+
* EventLoopGroup.
155+
*/
156+
static class RpcRequestInternal {
157+
public final BlockTransferMessage.Type messageType;
158+
public final RpcRequest rpcRequest;
159+
160+
RpcRequestInternal(BlockTransferMessage.Type messageType,
161+
RpcRequest rpcRequest) {
162+
this.messageType = messageType;
163+
this.rpcRequest = rpcRequest;
164+
}
165+
}
166+
167+
static class FinalizedHandler extends SimpleChannelInboundHandler<RpcRequestInternal> {
168+
private static final Logger logger = LoggerFactory.getLogger(FinalizedHandler.class);
169+
public static final String HANDLER_NAME = "finalizeHandler";
170+
private final TransportRequestHandler transportRequestHandler;
171+
172+
@Override
173+
public boolean acceptInboundMessage(Object msg) throws Exception {
174+
if (msg instanceof RpcRequestInternal) {
175+
RpcRequestInternal rpcRequestInternal = (RpcRequestInternal) msg;
176+
return rpcRequestInternal.messageType == BlockTransferMessage.Type.FINALIZE_SHUFFLE_MERGE;
177+
}
178+
return false;
179+
}
180+
181+
FinalizedHandler(TransportRequestHandler transportRequestHandler) {
182+
this.transportRequestHandler = transportRequestHandler;
183+
}
184+
185+
@Override
186+
protected void channelRead0(ChannelHandlerContext channelHandlerContext,
187+
RpcRequestInternal req) throws Exception {
188+
if (logger.isTraceEnabled()) {
189+
logger.trace("Finalize shuffle req from {} for rpc request {}",
190+
getRemoteAddress(channelHandlerContext.channel()), req.rpcRequest.requestId);
191+
}
192+
this.transportRequestHandler.handle(req.rpcRequest);
193+
}
194+
}
195+
}

0 commit comments

Comments
 (0)