Skip to content

[SPARK-19529][BRANCH-1.6] Backport PR #16866 to branch-1.6 #16917

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ public TransportClientFactory(
*
* Concurrency: This method is safe to call from multiple threads.
*/
public TransportClient createClient(String remoteHost, int remotePort) throws IOException {
public TransportClient createClient(String remoteHost, int remotePort)
throws IOException, InterruptedException {
// Get connection from the connection pool first.
// If it is not found or not active, create a new one.
final InetSocketAddress address = new InetSocketAddress(remoteHost, remotePort);
Expand Down Expand Up @@ -176,13 +177,14 @@ public TransportClient createClient(String remoteHost, int remotePort) throws IO
* As with {@link #createClient(String, int)}, this method is blocking.
*/
public TransportClient createUnmanagedClient(String remoteHost, int remotePort)
throws IOException {
throws IOException, InterruptedException {
final InetSocketAddress address = new InetSocketAddress(remoteHost, remotePort);
return createClient(address);
}

/** Create a completely new {@link TransportClient} to the remote address. */
private TransportClient createClient(InetSocketAddress address) throws IOException {
private TransportClient createClient(InetSocketAddress address)
throws IOException, InterruptedException {
logger.debug("Creating new connection to " + address);

Bootstrap bootstrap = new Bootstrap();
Expand All @@ -209,7 +211,7 @@ public void initChannel(SocketChannel ch) {
// Connect to the remote server
long preConnect = System.nanoTime();
ChannelFuture cf = bootstrap.connect(address);
if (!cf.awaitUninterruptibly(conf.connectionTimeoutMs())) {
if (!cf.await(conf.connectionTimeoutMs())) {
throw new IOException(
String.format("Connecting to %s timed out (%s ms)", address, conf.connectionTimeoutMs()));
} else if (cf.cause() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ public void run() {
clients.add(client);
} catch (IOException e) {
failed.incrementAndGet();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
};
Expand Down Expand Up @@ -140,7 +142,7 @@ public void reuseClientsUpToConfigVariableConcurrent() throws Exception {
}

@Test
public void returnDifferentClientsForDifferentServers() throws IOException {
public void returnDifferentClientsForDifferentServers() throws IOException, InterruptedException {
TransportClientFactory factory = context.createClientFactory();
TransportClient c1 = factory.createClient(TestUtils.getLocalHost(), server1.getPort());
TransportClient c2 = factory.createClient(TestUtils.getLocalHost(), server2.getPort());
Expand Down Expand Up @@ -169,7 +171,7 @@ public void neverReturnInactiveClients() throws IOException, InterruptedExceptio
}

@Test
public void closeBlockClientsWithFactory() throws IOException {
public void closeBlockClientsWithFactory() throws IOException, InterruptedException {
TransportClientFactory factory = context.createClientFactory();
TransportClient c1 = factory.createClient(TestUtils.getLocalHost(), server1.getPort());
TransportClient c2 = factory.createClient(TestUtils.getLocalHost(), server2.getPort());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void fetchBlocks(
new RetryingBlockFetcher.BlockFetchStarter() {
@Override
public void createAndStart(String[] blockIds, BlockFetchingListener listener)
throws IOException {
throws IOException, InterruptedException {
TransportClient client = clientFactory.createClient(host, port);
new OneForOneBlockFetcher(client, appId, execId, blockIds, listener).start();
}
Expand Down Expand Up @@ -136,7 +136,7 @@ public void registerWithShuffleServer(
String host,
int port,
String execId,
ExecutorShuffleInfo executorInfo) throws IOException {
ExecutorShuffleInfo executorInfo) throws IOException, InterruptedException {
checkInit();
TransportClient client = clientFactory.createUnmanagedClient(host, port);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ public static interface BlockFetchStarter {
* {@link org.apache.spark.network.client.TransportClientFactory} in order to fix connection
* issues.
*/
void createAndStart(String[] blockIds, BlockFetchingListener listener) throws IOException;
void createAndStart(String[] blockIds, BlockFetchingListener listener)
throws IOException, InterruptedException;
}

/** Shared executor service used for waiting and retrying. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ public MesosExternalShuffleClient(
super(conf, secretKeyHolder, saslEnabled, saslEncryptionEnabled);
}

public void registerDriverWithShuffleService(String host, int port) throws IOException {
public void registerDriverWithShuffleService(String host, int port)
throws IOException, InterruptedException {
checkInit();
ByteBuffer registerDriver = new RegisterDriver(appId).toByteBuffer();
TransportClient client = clientFactory.createClient(host, port);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public void afterEach() {
}

@Test
public void testGoodClient() throws IOException {
public void testGoodClient() throws IOException, InterruptedException {
clientFactory = context.createClientFactory(
Lists.<TransportClientBootstrap>newArrayList(
new SaslClientBootstrap(conf, "app-1", secretKeyHolder)));
Expand Down Expand Up @@ -132,7 +132,7 @@ public void testBadClient() {
}

@Test
public void testNoSaslClient() throws IOException {
public void testNoSaslClient() throws IOException, InterruptedException {
clientFactory = context.createClientFactory(
Lists.<TransportClientBootstrap>newArrayList());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ public void testFetchNoServer() throws Exception {
}

private void registerExecutor(String executorId, ExecutorShuffleInfo executorInfo)
throws IOException {
throws IOException, InterruptedException {
ExternalShuffleClient client = new ExternalShuffleClient(conf, null, false, false);
client.init(APP_ID);
client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void afterEach() {
}

@Test
public void testValid() throws IOException {
public void testValid() throws IOException, InterruptedException {
validate("my-app-id", "secret", false);
}

Expand All @@ -83,12 +83,13 @@ public void testBadSecret() {
}

@Test
public void testEncryption() throws IOException {
public void testEncryption() throws IOException, InterruptedException {
validate("my-app-id", "secret", true);
}

/** Creates an ExternalShuffleClient and attempts to register with the server. */
private void validate(String appId, String secretKey, boolean encrypt) throws IOException {
private void validate(String appId, String secretKey, boolean encrypt)
throws IOException, InterruptedException {
ExternalShuffleClient client =
new ExternalShuffleClient(conf, new TestSecretKeyHolder(appId, secretKey), true, encrypt);
client.init(appId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void afterEach() {
}

@Test
public void testNoFailures() throws IOException {
public void testNoFailures() throws IOException, InterruptedException {
BlockFetchingListener listener = mock(BlockFetchingListener.class);

List<? extends Map<String, Object>> interactions = Arrays.asList(
Expand All @@ -85,7 +85,7 @@ public void testNoFailures() throws IOException {
}

@Test
public void testUnrecoverableFailure() throws IOException {
public void testUnrecoverableFailure() throws IOException, InterruptedException {
BlockFetchingListener listener = mock(BlockFetchingListener.class);

List<? extends Map<String, Object>> interactions = Arrays.asList(
Expand All @@ -104,7 +104,7 @@ public void testUnrecoverableFailure() throws IOException {
}

@Test
public void testSingleIOExceptionOnFirst() throws IOException {
public void testSingleIOExceptionOnFirst() throws IOException, InterruptedException {
BlockFetchingListener listener = mock(BlockFetchingListener.class);

List<? extends Map<String, Object>> interactions = Arrays.asList(
Expand All @@ -127,7 +127,7 @@ public void testSingleIOExceptionOnFirst() throws IOException {
}

@Test
public void testSingleIOExceptionOnSecond() throws IOException {
public void testSingleIOExceptionOnSecond() throws IOException, InterruptedException {
BlockFetchingListener listener = mock(BlockFetchingListener.class);

List<? extends Map<String, Object>> interactions = Arrays.asList(
Expand All @@ -149,7 +149,7 @@ public void testSingleIOExceptionOnSecond() throws IOException {
}

@Test
public void testTwoIOExceptions() throws IOException {
public void testTwoIOExceptions() throws IOException, InterruptedException {
BlockFetchingListener listener = mock(BlockFetchingListener.class);

List<? extends Map<String, Object>> interactions = Arrays.asList(
Expand Down Expand Up @@ -177,7 +177,7 @@ public void testTwoIOExceptions() throws IOException {
}

@Test
public void testThreeIOExceptions() throws IOException {
public void testThreeIOExceptions() throws IOException, InterruptedException {
BlockFetchingListener listener = mock(BlockFetchingListener.class);

List<? extends Map<String, Object>> interactions = Arrays.asList(
Expand Down Expand Up @@ -209,7 +209,7 @@ public void testThreeIOExceptions() throws IOException {
}

@Test
public void testRetryAndUnrecoverable() throws IOException {
public void testRetryAndUnrecoverable() throws IOException, InterruptedException {
BlockFetchingListener listener = mock(BlockFetchingListener.class);

List<? extends Map<String, Object>> interactions = Arrays.asList(
Expand Down Expand Up @@ -252,7 +252,7 @@ public void testRetryAndUnrecoverable() throws IOException {
@SuppressWarnings("unchecked")
private static void performInteractions(List<? extends Map<String, Object>> interactions,
BlockFetchingListener listener)
throws IOException {
throws IOException, InterruptedException {

TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
BlockFetchStarter fetchStarter = mock(BlockFetchStarter.class);
Expand Down