Skip to content

Commit 594d520

Browse files
committed
use json to serialize application executor info
1 parent 1a7980b commit 594d520

File tree

6 files changed

+73
-29
lines changed

6 files changed

+73
-29
lines changed

network/shuffle/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@
4949
<version>1.8</version>
5050
</dependency>
5151

52+
<dependency>
53+
<groupId>com.fasterxml.jackson.core</groupId>
54+
<artifactId>jackson-databind</artifactId>
55+
</dependency>
56+
5257
<!-- Provided dependencies -->
5358
<dependency>
5459
<groupId>org.slf4j</groupId>

network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public class ExternalShuffleBlockHandler extends RpcHandler {
5151
final ExternalShuffleBlockResolver blockManager;
5252
private final OneForOneStreamManager streamManager;
5353

54-
public ExternalShuffleBlockHandler(TransportConf conf, File registeredExecutorFile) {
54+
public ExternalShuffleBlockHandler(TransportConf conf, File registeredExecutorFile) throws IOException {
5555
this(new OneForOneStreamManager(),
5656
new ExternalShuffleBlockResolver(conf, registeredExecutorFile));
5757
}

network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java

Lines changed: 29 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@
2323
import java.util.concurrent.Executor;
2424
import java.util.concurrent.Executors;
2525

26+
import com.fasterxml.jackson.annotation.JsonCreator;
27+
import com.fasterxml.jackson.annotation.JsonProperty;
28+
import com.fasterxml.jackson.databind.ObjectMapper;
2629
import com.google.common.annotations.VisibleForTesting;
2730
import com.google.common.base.Charsets;
2831
import com.google.common.base.Objects;
@@ -138,12 +141,10 @@ public void registerExecutor(
138141
synchronized (executors) {
139142
executors.put(fullId, executorInfo);
140143
try {
141-
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
142-
ObjectOutputStream out = new ObjectOutputStream(bytesOut);
143-
out.writeObject(executorInfo);
144-
out.close();
145144
if (db != null) {
146-
db.put(dbAppExecKey(new AppExecId(appId, execId)), bytesOut.toByteArray());
145+
byte[] key = dbAppExecKey(new AppExecId(appId, execId));
146+
byte[] value = mapper.writeValueAsString(executorInfo).getBytes(Charsets.UTF_8);
147+
db.put(key, value);
147148
}
148149
} catch (Exception e) {
149150
logger.error("Error saving registered executors", e);
@@ -204,7 +205,11 @@ public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
204205
if (appId.equals(fullId.appId)) {
205206
it.remove();
206207
if (db != null) {
207-
db.delete(dbAppExecKey(fullId));
208+
try {
209+
db.delete(dbAppExecKey(fullId));
210+
} catch (IOException e) {
211+
logger.error("Error deleting {} from executor state db", appId, e);
212+
}
208213
}
209214

210215
if (cleanupLocalDirs) {
@@ -301,11 +306,12 @@ void close() {
301306
}
302307

303308
/** Simply encodes an executor's full ID, which is appId + execId. */
304-
public static class AppExecId implements Serializable {
309+
public static class AppExecId {
305310
public final String appId;
306-
final String execId;
311+
public final String execId;
307312

308-
public AppExecId(String appId, String execId) {
313+
@JsonCreator
314+
public AppExecId(@JsonProperty("appId") String appId, @JsonProperty("execId") String execId) {
309315
this.appId = appId;
310316
this.execId = execId;
311317
}
@@ -333,14 +339,20 @@ public String toString() {
333339
}
334340
}
335341

336-
private static byte[] dbAppExecKey(AppExecId appExecId) {
337-
return (APP_KEY_PREFIX + ";" + appExecId.appId + ";" + appExecId.execId).getBytes(Charsets.UTF_8);
342+
static ObjectMapper mapper = new ObjectMapper();
343+
344+
private static byte[] dbAppExecKey(AppExecId appExecId) throws IOException {
345+
// we stick a common prefix on all the keys so we can find them in the DB
346+
String appExecJson = mapper.writeValueAsString(appExecId);
347+
String key = (APP_KEY_PREFIX + ";" + appExecJson);
348+
return key.getBytes(Charsets.UTF_8);
338349
}
339350

340-
private static AppExecId parseDbAppExecKey(String s) {
351+
private static AppExecId parseDbAppExecKey(String s) throws IOException {
341352
int p = s.indexOf(';');
342-
int p2 = s.indexOf(';', p + 1);
343-
return new AppExecId(s.substring(p + 1, p2), s.substring(p2 + 1));
353+
String json = s.substring(p + 1);
354+
AppExecId parsed = mapper.readValue(json, AppExecId.class);
355+
return parsed;
344356
}
345357

346358
private static final String APP_KEY_PREFIX = "AppExecShuffleInfo";
@@ -358,16 +370,9 @@ static ConcurrentMap<AppExecId, ExecutorShuffleInfo> reloadRegisteredExecutors(D
358370
if (!key.startsWith(APP_KEY_PREFIX))
359371
break;
360372
AppExecId id = parseDbAppExecKey(key);
361-
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(e.getValue()));
362-
try {
363-
registeredExecutors.put(
364-
id,
365-
(ExecutorShuffleInfo) in.readObject()
366-
);
367-
} catch (ClassNotFoundException e1) {
368-
throw new IOException(e1);
369-
}
370-
in.close();
373+
ExecutorShuffleInfo shuffleInfo =
374+
mapper.readValue(new String(e.getValue(), Charsets.UTF_8), ExecutorShuffleInfo.class);
375+
registeredExecutors.put(id, shuffleInfo);
371376
}
372377
}
373378
return registeredExecutors;

network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,28 @@
2020
import java.io.Serializable;
2121
import java.util.Arrays;
2222

23+
import com.fasterxml.jackson.annotation.JsonCreator;
24+
import com.fasterxml.jackson.annotation.JsonProperty;
2325
import com.google.common.base.Objects;
2426
import io.netty.buffer.ByteBuf;
2527

2628
import org.apache.spark.network.protocol.Encodable;
2729
import org.apache.spark.network.protocol.Encoders;
2830

2931
/** Contains all configuration necessary for locating the shuffle files of an executor. */
30-
public class ExecutorShuffleInfo implements Encodable, Serializable {
32+
public class ExecutorShuffleInfo implements Encodable {
3133
/** The base set of local directories that the executor stores its shuffle files in. */
3234
public final String[] localDirs;
3335
/** Number of subdirectories created within each localDir. */
3436
public final int subDirsPerLocalDir;
3537
/** Shuffle manager (SortShuffleManager or HashShuffleManager) that the executor is using. */
3638
public final String shuffleManager;
3739

38-
public ExecutorShuffleInfo(String[] localDirs, int subDirsPerLocalDir, String shuffleManager) {
40+
@JsonCreator
41+
public ExecutorShuffleInfo(
42+
@JsonProperty("localDirs")String[] localDirs,
43+
@JsonProperty("subDirsPerLocalDir") int subDirsPerLocalDir,
44+
@JsonProperty("shuffleManager") String shuffleManager) {
3945
this.localDirs = localDirs;
4046
this.subDirsPerLocalDir = subDirsPerLocalDir;
4147
this.shuffleManager = shuffleManager;

network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@
2222
import java.io.InputStreamReader;
2323

2424
import com.google.common.io.CharStreams;
25+
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
2526
import org.apache.spark.network.util.SystemPropertyConfigProvider;
2627
import org.apache.spark.network.util.TransportConf;
28+
import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.AppExecId;
2729
import org.junit.AfterClass;
2830
import org.junit.BeforeClass;
2931
import org.junit.Test;
@@ -59,7 +61,7 @@ public static void afterAll() {
5961
}
6062

6163
@Test
62-
public void testBadRequests() {
64+
public void testBadRequests() throws IOException {
6365
ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null);
6466
// Unregistered executor
6567
try {
@@ -126,4 +128,30 @@ public void testHashShuffleBlocks() throws IOException {
126128
block1Stream.close();
127129
assertEquals(hashBlock1, block1);
128130
}
131+
132+
@Test
133+
public void jsonSerializationOfExecutorRegistration() throws IOException {
134+
AppExecId appId = new AppExecId("foo", "bar");
135+
String appIdJson = ExternalShuffleBlockResolver.mapper.writeValueAsString(appId);
136+
AppExecId parsedAppId =
137+
ExternalShuffleBlockResolver.mapper.readValue(appIdJson, AppExecId.class);
138+
assertEquals(parsedAppId, appId);
139+
140+
ExecutorShuffleInfo shuffleInfo =
141+
new ExecutorShuffleInfo(new String[]{"/bippy", "/flippy"}, 7, "hash");
142+
String shuffleJson = ExternalShuffleBlockResolver.mapper.writeValueAsString(shuffleInfo);
143+
ExecutorShuffleInfo parsedShuffleInfo =
144+
ExternalShuffleBlockResolver.mapper.readValue(shuffleJson, ExecutorShuffleInfo.class);
145+
assertEquals(parsedShuffleInfo, shuffleInfo);
146+
147+
// Intentionally keep these hard-coded strings in here, to check backwards-compatability.
148+
// its not legacy yet, but keeping this here in case anybody changes it
149+
String legacyAppIdJson = "{\"appId\":\"foo\", \"execId\":\"bar\"}";
150+
assertEquals(appId,
151+
ExternalShuffleBlockResolver.mapper.readValue(legacyAppIdJson, AppExecId.class));
152+
String legacyShuffleJson = "{\"localDirs\": [\"/bippy\", \"/flippy\"], " +
153+
"\"subDirsPerLocalDir\": 7, \"shuffleManager\": \"hash\"}";
154+
assertEquals(shuffleInfo,
155+
ExternalShuffleBlockResolver.mapper.readValue(legacyShuffleJson, ExecutorShuffleInfo.class));
156+
}
129157
}

network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public class ExternalShuffleSecuritySuite {
4343
TransportServer server;
4444

4545
@Before
46-
public void beforeEach() {
46+
public void beforeEach() throws IOException {
4747
TransportContext context =
4848
new TransportContext(conf, new ExternalShuffleBlockHandler(conf, null));
4949
TransportServerBootstrap bootstrap = new SaslServerBootstrap(conf,

0 commit comments

Comments
 (0)