Skip to content

Commit f33c72b

Browse files
committed
Code review feedback.
1 parent 0fefabb commit f33c72b

File tree

2 files changed

+30
-52
lines changed

2 files changed

+30
-52
lines changed

network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.apache.spark.network.TransportContext;
4444
import org.apache.spark.network.server.TransportChannelHandler;
4545
import org.apache.spark.network.util.IOMode;
46+
import org.apache.spark.network.util.JavaUtils;
4647
import org.apache.spark.network.util.NettyUtils;
4748
import org.apache.spark.network.util.TransportConf;
4849

@@ -59,14 +60,14 @@
5960
public class TransportClientFactory implements Closeable {
6061

6162
/** A simple data structure to track the pool of clients between two peer nodes. */
62-
private class ClientPool {
63+
private static class ClientPool {
6364
TransportClient[] clients;
6465
Object[] locks;
6566

66-
public ClientPool() {
67-
clients = new TransportClient[numConnectionsPerPeer];
68-
locks = new Object[numConnectionsPerPeer];
69-
for (int i = 0; i < numConnectionsPerPeer; i++) {
67+
public ClientPool(int size) {
68+
clients = new TransportClient[size];
69+
locks = new Object[size];
70+
for (int i = 0; i < size; i++) {
7071
locks[i] = new Object();
7172
}
7273
}
@@ -123,7 +124,7 @@ public TransportClient createClient(String remoteHost, int remotePort) throws IO
123124
// Create the ClientPool if we don't have it yet.
124125
ClientPool clientPool = connectionPool.get(address);
125126
if (clientPool == null) {
126-
connectionPool.putIfAbsent(address, new ClientPool());
127+
connectionPool.putIfAbsent(address, new ClientPool(numConnectionsPerPeer));
127128
clientPool = connectionPool.get(address);
128129
}
129130

@@ -219,11 +220,7 @@ public void close() {
219220
TransportClient client = clientPool.clients[i];
220221
if (client != null) {
221222
clientPool.clients[i] = null;
222-
try {
223-
client.close();
224-
} catch (RuntimeException e) {
225-
logger.warn("Ignoring exception during close", e);
226-
}
223+
JavaUtils.closeQuietly(client);
227224
}
228225
}
229226
}

network/common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java

Lines changed: 22 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -65,38 +65,10 @@ public void tearDown() {
6565
/**
6666
* Request a bunch of clients to a single server to test
6767
* we create up to maxConnections of clients.
68+
*
69+
* If concurrent is true, create multiple threads to create clients in parallel.
6870
*/
69-
private void testClientReuse(final int maxConnections) throws IOException {
70-
TransportConf conf = new TransportConf(new ConfigProvider() {
71-
@Override
72-
public String get(String name) {
73-
if (name.equals("spark.shuffle.io.numConnectionsPerPeer")) {
74-
return Integer.toString(maxConnections);
75-
} else {
76-
throw new NoSuchElementException();
77-
}
78-
}
79-
});
80-
81-
RpcHandler rpcHandler = new NoOpRpcHandler();
82-
TransportContext context = new TransportContext(conf, rpcHandler);
83-
TransportClientFactory factory = context.createClientFactory();
84-
HashSet<TransportClient> clients = new HashSet<TransportClient>();
85-
for (int i = 0; i < maxConnections * 10; i++) {
86-
TransportClient client = factory.createClient(TestUtils.getLocalHost(), server1.getPort());
87-
assert(client.isActive());
88-
clients.add(client);
89-
}
90-
91-
assert(clients.size() == maxConnections);
92-
}
93-
94-
/**
95-
* Request a bunch of clients to a single server to test
96-
* we create up to maxConnections of clients. This is a parallel
97-
* version of testClientReuse.
98-
*/
99-
private void testClientReuseConcurrent(final int maxConnections)
71+
private void testClientReuse(final int maxConnections, boolean concurrent)
10072
throws IOException, InterruptedException {
10173
TransportConf conf = new TransportConf(new ConfigProvider() {
10274
@Override
@@ -133,7 +105,12 @@ public void run() {
133105
}
134106
}
135107
};
136-
attempts[i].run();
108+
109+
if (concurrent) {
110+
attempts[i].start();
111+
} else {
112+
attempts[i].run();
113+
}
137114
}
138115

139116
// Wait until all the threads complete.
@@ -143,22 +120,26 @@ public void run() {
143120

144121
assert(failed.get() == 0);
145122
assert(clients.size() == maxConnections);
123+
124+
for (TransportClient client : clients) {
125+
client.close();
126+
}
146127
}
147128

148129
@Test
149-
public void reuseClientsUpToConfigVariable() throws IOException {
150-
testClientReuse(1);
151-
testClientReuse(2);
152-
testClientReuse(3);
153-
testClientReuse(4);
130+
public void reuseClientsUpToConfigVariable() throws Exception {
131+
testClientReuse(1, false);
132+
testClientReuse(2, false);
133+
testClientReuse(3, false);
134+
testClientReuse(4, false);
154135
}
155136

156137
@Test
157138
public void reuseClientsUpToConfigVariableConcurrent() throws Exception {
158-
testClientReuseConcurrent(1);
159-
testClientReuseConcurrent(2);
160-
testClientReuseConcurrent(3);
161-
testClientReuseConcurrent(4);
139+
testClientReuse(1, true);
140+
testClientReuse(2, true);
141+
testClientReuse(3, true);
142+
testClientReuse(4, true);
162143
}
163144

164145
@Test

0 commit comments

Comments
 (0)