Skip to content

Commit f50c437

Browse files
JoshRosenliancheng
authored andcommitted
[SPARK-19529] TransportClientFactory.createClient() shouldn't call awaitUninterruptibly()
This patch replaces a single `awaitUninterruptibly()` call with a plain `await()` call in Spark's `network-common` library in order to fix a bug which may cause tasks to be uncancellable. In Spark's Netty RPC layer, `TransportClientFactory.createClient()` calls `awaitUninterruptibly()` on a Netty future while waiting for a connection to be established. This creates problem when a Spark task is interrupted while blocking in this call (which can happen in the event of a slow connection which will eventually time out). This has bad impacts on task cancellation when `interruptOnCancel = true`. As an example of the impact of this problem, I experienced significant numbers of uncancellable "zombie tasks" on a production cluster where several tasks were blocked trying to connect to a dead shuffle server and then continued running as zombies after I cancelled the associated Spark stage. The zombie tasks ran for several minutes with the following stack: ``` java.lang.Object.wait(Native Method) java.lang.Object.wait(Object.java:460) io.netty.util.concurrent.DefaultPromise.await0(DefaultPromise.java:607) io.netty.util.concurrent.DefaultPromise.awaitUninterruptibly(DefaultPromise.java:301) org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:224) org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179) => holding Monitor(java.lang.Object1849476028}) org.apache.spark.network.shuffle.ExternalShuffleClient$1.createAndStart(ExternalShuffleClient.java:105) org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) org.apache.spark.network.shuffle.ExternalShuffleClient.fetchBlocks(ExternalShuffleClient.java:114) org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:169) org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala: 350) org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:286) org.apache.spark.storage.ShuffleBlockFetcherIterator.<init>(ShuffleBlockFetcherIterator.scala:120) org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:45) org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:169) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) org.apache.spark.rdd.RDD.iterator(RDD.scala:287) [...] ``` As far as I can tell, `awaitUninterruptibly()` might have been used in order to avoid having to declare that methods throw `InterruptedException` (this code is written in Java, hence the need to use checked exceptions). This patch simply replaces this with a regular, interruptible `await()` call,. This required several interface changes to declare a new checked exception (these are internal interfaces, though, and this change doesn't significantly impact binary compatibility). An alternative approach would be to wrap `InterruptedException` into `IOException` in order to avoid having to change interfaces. The problem with this approach is that the `network-shuffle` project's `RetryingBlockFetcher` code treats `IOExceptions` as transitive failures when deciding whether to retry fetches, so throwing a wrapped `IOException` might cause an interrupted shuffle fetch to be retried, further prolonging the lifetime of a cancelled zombie task. Note that there are three other `awaitUninterruptibly()` in the codebase, but those calls have a hard 10 second timeout and are waiting on a `close()` operation which is expected to complete near instantaneously, so the impact of uninterruptibility there is much smaller. Manually. Author: Josh Rosen <joshrosen@databricks.com> Closes #16866 from JoshRosen/SPARK-19529. (cherry picked from commit 1c4d10b) Signed-off-by: Cheng Lian <lian@databricks.com>
1 parent 23050c8 commit f50c437

File tree

9 files changed

+30
-24
lines changed

9 files changed

+30
-24
lines changed

common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,8 @@ public TransportClientFactory(
122122
*
123123
* Concurrency: This method is safe to call from multiple threads.
124124
*/
125-
public TransportClient createClient(String remoteHost, int remotePort) throws IOException {
125+
public TransportClient createClient(String remoteHost, int remotePort)
126+
throws IOException, InterruptedException {
126127
// Get connection from the connection pool first.
127128
// If it is not found or not active, create a new one.
128129
// Use unresolved address here to avoid DNS resolution each time we creates a client.
@@ -190,13 +191,14 @@ public TransportClient createClient(String remoteHost, int remotePort) throws IO
190191
* As with {@link #createClient(String, int)}, this method is blocking.
191192
*/
192193
public TransportClient createUnmanagedClient(String remoteHost, int remotePort)
193-
throws IOException {
194+
throws IOException, InterruptedException {
194195
final InetSocketAddress address = new InetSocketAddress(remoteHost, remotePort);
195196
return createClient(address);
196197
}
197198

198199
/** Create a completely new {@link TransportClient} to the remote address. */
199-
private TransportClient createClient(InetSocketAddress address) throws IOException {
200+
private TransportClient createClient(InetSocketAddress address)
201+
throws IOException, InterruptedException {
200202
logger.debug("Creating new connection to {}", address);
201203

202204
Bootstrap bootstrap = new Bootstrap();
@@ -223,7 +225,7 @@ public void initChannel(SocketChannel ch) {
223225
// Connect to the remote server
224226
long preConnect = System.nanoTime();
225227
ChannelFuture cf = bootstrap.connect(address);
226-
if (!cf.awaitUninterruptibly(conf.connectionTimeoutMs())) {
228+
if (!cf.await(conf.connectionTimeoutMs())) {
227229
throw new IOException(
228230
String.format("Connecting to %s timed out (%s ms)", address, conf.connectionTimeoutMs()));
229231
} else if (cf.cause() != null) {

common/network-common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@ public void run() {
100100
clients.add(client);
101101
} catch (IOException e) {
102102
failed.incrementAndGet();
103+
} catch (InterruptedException e) {
104+
throw new RuntimeException(e);
103105
}
104106
}
105107
};
@@ -143,7 +145,7 @@ public void reuseClientsUpToConfigVariableConcurrent() throws Exception {
143145
}
144146

145147
@Test
146-
public void returnDifferentClientsForDifferentServers() throws IOException {
148+
public void returnDifferentClientsForDifferentServers() throws IOException, InterruptedException {
147149
TransportClientFactory factory = context.createClientFactory();
148150
TransportClient c1 = factory.createClient(TestUtils.getLocalHost(), server1.getPort());
149151
TransportClient c2 = factory.createClient(TestUtils.getLocalHost(), server2.getPort());
@@ -172,7 +174,7 @@ public void neverReturnInactiveClients() throws IOException, InterruptedExceptio
172174
}
173175

174176
@Test
175-
public void closeBlockClientsWithFactory() throws IOException {
177+
public void closeBlockClientsWithFactory() throws IOException, InterruptedException {
176178
TransportClientFactory factory = context.createClientFactory();
177179
TransportClient c1 = factory.createClient(TestUtils.getLocalHost(), server1.getPort());
178180
TransportClient c2 = factory.createClient(TestUtils.getLocalHost(), server2.getPort());

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public void fetchBlocks(
101101
new RetryingBlockFetcher.BlockFetchStarter() {
102102
@Override
103103
public void createAndStart(String[] blockIds, BlockFetchingListener listener)
104-
throws IOException {
104+
throws IOException, InterruptedException {
105105
TransportClient client = clientFactory.createClient(host, port);
106106
new OneForOneBlockFetcher(client, appId, execId, blockIds, listener).start();
107107
}
@@ -136,7 +136,7 @@ public void registerWithShuffleServer(
136136
String host,
137137
int port,
138138
String execId,
139-
ExecutorShuffleInfo executorInfo) throws IOException {
139+
ExecutorShuffleInfo executorInfo) throws IOException, InterruptedException {
140140
checkInit();
141141
TransportClient client = clientFactory.createUnmanagedClient(host, port);
142142
try {

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ public interface BlockFetchStarter {
5757
* {@link org.apache.spark.network.client.TransportClientFactory} in order to fix connection
5858
* issues.
5959
*/
60-
void createAndStart(String[] blockIds, BlockFetchingListener listener) throws IOException;
60+
void createAndStart(String[] blockIds, BlockFetchingListener listener)
61+
throws IOException, InterruptedException;
6162
}
6263

6364
/** Shared executor service used for waiting and retrying. */

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public void registerDriverWithShuffleService(
6969
String host,
7070
int port,
7171
long heartbeatTimeoutMs,
72-
long heartbeatIntervalMs) throws IOException {
72+
long heartbeatIntervalMs) throws IOException, InterruptedException {
7373

7474
checkInit();
7575
ByteBuffer registerDriver = new RegisterDriver(appId, heartbeatTimeoutMs).toByteBuffer();

common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ public void afterEach() {
103103
}
104104

105105
@Test
106-
public void testGoodClient() throws IOException {
106+
public void testGoodClient() throws IOException, InterruptedException {
107107
clientFactory = context.createClientFactory(
108108
Lists.<TransportClientBootstrap>newArrayList(
109109
new SaslClientBootstrap(conf, "app-1", secretKeyHolder)));
@@ -133,7 +133,7 @@ public void testBadClient() {
133133
}
134134

135135
@Test
136-
public void testNoSaslClient() throws IOException {
136+
public void testNoSaslClient() throws IOException, InterruptedException {
137137
clientFactory = context.createClientFactory(
138138
Lists.<TransportClientBootstrap>newArrayList());
139139

common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ public void testFetchNoServer() throws Exception {
240240
}
241241

242242
private void registerExecutor(String executorId, ExecutorShuffleInfo executorInfo)
243-
throws IOException {
243+
throws IOException, InterruptedException {
244244
ExternalShuffleClient client = new ExternalShuffleClient(conf, null, false, false);
245245
client.init(APP_ID);
246246
client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(),

common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public void afterEach() {
5959
}
6060

6161
@Test
62-
public void testValid() throws IOException {
62+
public void testValid() throws IOException, InterruptedException {
6363
validate("my-app-id", "secret", false);
6464
}
6565

@@ -82,12 +82,13 @@ public void testBadSecret() {
8282
}
8383

8484
@Test
85-
public void testEncryption() throws IOException {
85+
public void testEncryption() throws IOException, InterruptedException {
8686
validate("my-app-id", "secret", true);
8787
}
8888

8989
/** Creates an ExternalShuffleClient and attempts to register with the server. */
90-
private void validate(String appId, String secretKey, boolean encrypt) throws IOException {
90+
private void validate(String appId, String secretKey, boolean encrypt)
91+
throws IOException, InterruptedException {
9192
ExternalShuffleClient client =
9293
new ExternalShuffleClient(conf, new TestSecretKeyHolder(appId, secretKey), true, encrypt);
9394
client.init(appId);

common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public void afterEach() {
6666
}
6767

6868
@Test
69-
public void testNoFailures() throws IOException {
69+
public void testNoFailures() throws IOException, InterruptedException {
7070
BlockFetchingListener listener = mock(BlockFetchingListener.class);
7171

7272
List<? extends Map<String, Object>> interactions = Arrays.asList(
@@ -85,7 +85,7 @@ public void testNoFailures() throws IOException {
8585
}
8686

8787
@Test
88-
public void testUnrecoverableFailure() throws IOException {
88+
public void testUnrecoverableFailure() throws IOException, InterruptedException {
8989
BlockFetchingListener listener = mock(BlockFetchingListener.class);
9090

9191
List<? extends Map<String, Object>> interactions = Arrays.asList(
@@ -104,7 +104,7 @@ public void testUnrecoverableFailure() throws IOException {
104104
}
105105

106106
@Test
107-
public void testSingleIOExceptionOnFirst() throws IOException {
107+
public void testSingleIOExceptionOnFirst() throws IOException, InterruptedException {
108108
BlockFetchingListener listener = mock(BlockFetchingListener.class);
109109

110110
List<? extends Map<String, Object>> interactions = Arrays.asList(
@@ -127,7 +127,7 @@ public void testSingleIOExceptionOnFirst() throws IOException {
127127
}
128128

129129
@Test
130-
public void testSingleIOExceptionOnSecond() throws IOException {
130+
public void testSingleIOExceptionOnSecond() throws IOException, InterruptedException {
131131
BlockFetchingListener listener = mock(BlockFetchingListener.class);
132132

133133
List<? extends Map<String, Object>> interactions = Arrays.asList(
@@ -149,7 +149,7 @@ public void testSingleIOExceptionOnSecond() throws IOException {
149149
}
150150

151151
@Test
152-
public void testTwoIOExceptions() throws IOException {
152+
public void testTwoIOExceptions() throws IOException, InterruptedException {
153153
BlockFetchingListener listener = mock(BlockFetchingListener.class);
154154

155155
List<? extends Map<String, Object>> interactions = Arrays.asList(
@@ -177,7 +177,7 @@ public void testTwoIOExceptions() throws IOException {
177177
}
178178

179179
@Test
180-
public void testThreeIOExceptions() throws IOException {
180+
public void testThreeIOExceptions() throws IOException, InterruptedException {
181181
BlockFetchingListener listener = mock(BlockFetchingListener.class);
182182

183183
List<? extends Map<String, Object>> interactions = Arrays.asList(
@@ -209,7 +209,7 @@ public void testThreeIOExceptions() throws IOException {
209209
}
210210

211211
@Test
212-
public void testRetryAndUnrecoverable() throws IOException {
212+
public void testRetryAndUnrecoverable() throws IOException, InterruptedException {
213213
BlockFetchingListener listener = mock(BlockFetchingListener.class);
214214

215215
List<? extends Map<String, Object>> interactions = Arrays.asList(
@@ -252,7 +252,7 @@ public void testRetryAndUnrecoverable() throws IOException {
252252
@SuppressWarnings("unchecked")
253253
private static void performInteractions(List<? extends Map<String, Object>> interactions,
254254
BlockFetchingListener listener)
255-
throws IOException {
255+
throws IOException, InterruptedException {
256256

257257
TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
258258
BlockFetchStarter fetchStarter = mock(BlockFetchStarter.class);

0 commit comments

Comments
 (0)