Skip to content

Commit fed407a

Browse files
tedyuMridul Muralidharan
authored and
Mridul Muralidharan
committed
[SPARK-42090][3.2] Introduce sasl retry count in RetryingBlockTransferor
### What changes were proposed in this pull request? This PR introduces sasl retry count in RetryingBlockTransferor. ### Why are the changes needed? Previously a boolean variable, saslTimeoutSeen, was used. However, the boolean variable wouldn't cover the following scenario: 1. SaslTimeoutException 2. IOException 3. SaslTimeoutException 4. IOException Even though IOException at #2 is retried (resulting in increment of retryCount), the retryCount would be cleared at step #4. Since the intention of saslTimeoutSeen is to undo the increment due to retrying SaslTimeoutException, we should keep a counter for SaslTimeoutException retries and subtract the value of this counter from retryCount. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New test is added, courtesy of Mridul. Closes #39611 from tedyu/sasl-cnt. Authored-by: Ted Yu <yuzhihonggmail.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> Closes #39710 from akpatnam25/SPARK-42090-backport-3.2. Authored-by: Ted Yu <yuzhihong@gmail.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
1 parent dcd5d75 commit fed407a

File tree

2 files changed

+65
-16
lines changed

2 files changed

+65
-16
lines changed

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.concurrent.TimeUnit;
2626

2727
import com.google.common.annotations.VisibleForTesting;
28+
import com.google.common.base.Preconditions;
2829
import com.google.common.collect.Sets;
2930
import com.google.common.util.concurrent.Uninterruptibles;
3031
import org.slf4j.Logger;
@@ -87,7 +88,16 @@ void createAndStart(String[] blockIds, BlockTransferListener listener)
8788
/** Number of times we've attempted to retry so far. */
8889
private int retryCount = 0;
8990

90-
private boolean saslTimeoutSeen;
91+
// Number of times SASL timeout has been retried without success.
92+
// If we see maxRetries consecutive failures, the request is failed.
93+
// On the other hand, if sasl succeeds and we are able to send other requests subsequently,
94+
// we reduce the SASL failures from retryCount (since SASL failures were part of
95+
// connection bootstrap - which ended up being successful).
96+
// spark.network.auth.rpcTimeout is much lower than spark.network.timeout and others -
97+
// and so sasl is more susceptible to failures when remote service
98+
// (like external shuffle service) is under load: but once it succeeds, we do not want to
99+
// include it as part of request retries.
100+
private int saslRetryCount = 0;
91101

92102
/**
93103
* Set of all block ids which have not been transferred successfully or with a non-IO Exception.
@@ -123,7 +133,7 @@ public RetryingBlockTransferor(
123133
this.currentListener = new RetryingBlockTransferListener();
124134
this.errorHandler = errorHandler;
125135
this.enableSaslRetries = conf.enableSaslRetries();
126-
this.saslTimeoutSeen = false;
136+
this.saslRetryCount = 0;
127137
}
128138

129139
public RetryingBlockTransferor(
@@ -167,7 +177,7 @@ private void transferAllOutstanding() {
167177
numRetries > 0 ? "(after " + numRetries + " retries)" : ""), e);
168178

169179
if (shouldRetry(e)) {
170-
initiateRetry();
180+
initiateRetry(e);
171181
} else {
172182
for (String bid : blockIdsToTransfer) {
173183
listener.onBlockTransferFailure(bid, e);
@@ -180,7 +190,10 @@ private void transferAllOutstanding() {
180190
* Lightweight method which initiates a retry in a different thread. The retry will involve
181191
* calling transferAllOutstanding() after a configured wait time.
182192
*/
183-
private synchronized void initiateRetry() {
193+
private synchronized void initiateRetry(Throwable e) {
194+
if (enableSaslRetries && e instanceof SaslTimeoutException) {
195+
saslRetryCount += 1;
196+
}
184197
retryCount += 1;
185198
currentListener = new RetryingBlockTransferListener();
186199

@@ -203,16 +216,17 @@ private synchronized boolean shouldRetry(Throwable e) {
203216
boolean isIOException = e instanceof IOException
204217
|| e.getCause() instanceof IOException;
205218
boolean isSaslTimeout = enableSaslRetries && e instanceof SaslTimeoutException;
206-
if (!isSaslTimeout && saslTimeoutSeen) {
207-
retryCount = 0;
208-
saslTimeoutSeen = false;
219+
// If this is a non SASL request failure, reduce earlier SASL failures from retryCount
220+
// since some subsequent SASL attempt was successful
221+
if (!isSaslTimeout && saslRetryCount > 0) {
222+
Preconditions.checkState(retryCount >= saslRetryCount,
223+
"retryCount must be greater than or equal to saslRetryCount");
224+
retryCount -= saslRetryCount;
225+
saslRetryCount = 0;
209226
}
210227
boolean hasRemainingRetries = retryCount < maxRetries;
211228
boolean shouldRetry = (isSaslTimeout || isIOException) &&
212229
hasRemainingRetries && errorHandler.shouldRetryError(e);
213-
if (shouldRetry && isSaslTimeout) {
214-
this.saslTimeoutSeen = true;
215-
}
216230
return shouldRetry;
217231
}
218232

@@ -236,9 +250,13 @@ private void handleBlockTransferSuccess(String blockId, ManagedBuffer data) {
236250
if (this == currentListener && outstandingBlocksIds.contains(blockId)) {
237251
outstandingBlocksIds.remove(blockId);
238252
shouldForwardSuccess = true;
239-
if (saslTimeoutSeen) {
240-
retryCount = 0;
241-
saslTimeoutSeen = false;
253+
// If there were SASL failures earlier, remove them from retryCount, as there was
254+
// a SASL success (and some other request post bootstrap was also successful).
255+
if (saslRetryCount > 0) {
256+
Preconditions.checkState(retryCount >= saslRetryCount,
257+
"retryCount must be greater than or equal to saslRetryCount");
258+
retryCount -= saslRetryCount;
259+
saslRetryCount = 0;
242260
}
243261
}
244262
}
@@ -256,7 +274,7 @@ private void handleBlockTransferFailure(String blockId, Throwable exception) {
256274
synchronized (RetryingBlockTransferor.this) {
257275
if (this == currentListener && outstandingBlocksIds.contains(blockId)) {
258276
if (shouldRetry(exception)) {
259-
initiateRetry();
277+
initiateRetry(exception);
260278
} else {
261279
if (errorHandler.shouldLogError(exception)) {
262280
logger.error(

common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,12 @@ public class RetryingBlockTransferorSuite {
5858
private static Map<String, String> configMap;
5959
private static RetryingBlockTransferor _retryingBlockTransferor;
6060

61+
private static final int MAX_RETRIES = 2;
62+
6163
@Before
6264
public void initMap() {
6365
configMap = new HashMap<String, String>() {{
64-
put("spark.shuffle.io.maxRetries", "2");
66+
put("spark.shuffle.io.maxRetries", Integer.toString(MAX_RETRIES));
6567
put("spark.shuffle.io.retryWait", "0");
6668
}};
6769
}
@@ -309,7 +311,7 @@ public void testRepeatedSaslRetryFailures() throws IOException, InterruptedExcep
309311
verify(listener, timeout(5000)).onBlockTransferFailure("b0", saslTimeoutException);
310312
verify(listener, times(3)).getTransferType();
311313
verifyNoMoreInteractions(listener);
312-
assert(_retryingBlockTransferor.getRetryCount() == 2);
314+
assert(_retryingBlockTransferor.getRetryCount() == MAX_RETRIES);
313315
}
314316

315317
@Test
@@ -341,6 +343,35 @@ public void testBlockTransferFailureAfterSasl() throws IOException, InterruptedE
341343
assert(_retryingBlockTransferor.getRetryCount() == 1);
342344
}
343345

346+
@Test
347+
public void testIOExceptionFailsConnectionEvenWithSaslException()
348+
throws IOException, InterruptedException {
349+
BlockFetchingListener listener = mock(BlockFetchingListener.class);
350+
351+
SaslTimeoutException saslExceptionInitial = new SaslTimeoutException("initial",
352+
new TimeoutException());
353+
SaslTimeoutException saslExceptionFinal = new SaslTimeoutException("final",
354+
new TimeoutException());
355+
IOException ioException = new IOException();
356+
List<? extends Map<String, Object>> interactions = Arrays.asList(
357+
ImmutableMap.of("b0", saslExceptionInitial),
358+
ImmutableMap.of("b0", ioException),
359+
ImmutableMap.of("b0", saslExceptionInitial),
360+
ImmutableMap.of("b0", ioException),
361+
ImmutableMap.of("b0", saslExceptionFinal),
362+
// will not get invoked because the connection fails
363+
ImmutableMap.of("b0", ioException),
364+
// will not get invoked
365+
ImmutableMap.of("b0", block0)
366+
);
367+
configMap.put("spark.shuffle.sasl.enableRetries", "true");
368+
performInteractions(interactions, listener);
369+
verify(listener, timeout(5000)).onBlockTransferFailure("b0", saslExceptionFinal);
370+
verify(listener, atLeastOnce()).getTransferType();
371+
verifyNoMoreInteractions(listener);
372+
assert(_retryingBlockTransferor.getRetryCount() == MAX_RETRIES);
373+
}
374+
344375
/**
345376
* Performs a set of interactions in response to block requests from a RetryingBlockFetcher.
346377
* Each interaction is a Map from BlockId to either ManagedBuffer or Exception. This interaction

0 commit comments

Comments
 (0)