Skip to content

Commit 795d28f

Browse files
committed
review feedback
1 parent 81f80e2 commit 795d28f

File tree

3 files changed

+30
-43
lines changed

3 files changed

+30
-43
lines changed

network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java

Lines changed: 19 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@
5757
public class ExternalShuffleBlockResolver {
5858
private static final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockResolver.class);
5959

60+
private static final ObjectMapper mapper = new ObjectMapper();
61+
private static final String APP_KEY_PREFIX = "AppExecShuffleInfo";
62+
private static final StoreVersion CURRENT_VERSION = new StoreVersion(1, 0);
63+
6064
// Map containing all registered executors' metadata.
6165
@VisibleForTesting
6266
final ConcurrentMap<AppExecId, ExecutorShuffleInfo> executors;
@@ -339,8 +343,6 @@ public String toString() {
339343
}
340344
}
341345

342-
static ObjectMapper mapper = new ObjectMapper();
343-
344346
private static byte[] dbAppExecKey(AppExecId appExecId) throws IOException {
345347
// we stick a common prefix on all the keys so we can find them in the DB
346348
String appExecJson = mapper.writeValueAsString(appExecId);
@@ -355,8 +357,6 @@ private static AppExecId parseDbAppExecKey(String s) throws IOException {
355357
return parsed;
356358
}
357359

358-
private static final String APP_KEY_PREFIX = "AppExecShuffleInfo";
359-
360360
@VisibleForTesting
361361
static ConcurrentMap<AppExecId, ExecutorShuffleInfo> reloadRegisteredExecutors(DB db)
362362
throws IOException {
@@ -367,11 +367,11 @@ static ConcurrentMap<AppExecId, ExecutorShuffleInfo> reloadRegisteredExecutors(D
367367
while (itr.hasNext()) {
368368
Map.Entry<byte[], byte[]> e = itr.next();
369369
String key = new String(e.getKey(), Charsets.UTF_8);
370-
if (!key.startsWith(APP_KEY_PREFIX))
370+
if (!key.startsWith(APP_KEY_PREFIX)) {
371371
break;
372+
}
372373
AppExecId id = parseDbAppExecKey(key);
373-
ExecutorShuffleInfo shuffleInfo =
374-
mapper.readValue(new String(e.getValue(), Charsets.UTF_8), ExecutorShuffleInfo.class);
374+
ExecutorShuffleInfo shuffleInfo = mapper.readValue(e.getValue(), ExecutorShuffleInfo.class);
375375
registeredExecutors.put(id, shuffleInfo);
376376
}
377377
}
@@ -387,18 +387,17 @@ public void log(String message) {
387387
}
388388
}
389389

390-
private static final StoreVersion CURRENT_VERSION = new StoreVersion(1,0);
390+
/**
391+
* Simple major.minor versioning scheme. Any incompatible changes should be across major
392+
* versions. Minor version differences are allowed -- meaning we should be able to read
393+
* dbs that are either earlier *or* later on the minor version.
394+
*/
391395
private static void checkVersion(DB db) throws IOException {
392396
byte[] bytes = db.get(StoreVersion.KEY);
393397
if (bytes == null) {
394398
storeVersion(db);
395-
} else if (bytes.length != 8) {
396-
throw new IOException("unexpected version format");
397399
} else {
398-
DataInputStream in = new DataInputStream(new ByteArrayInputStream(bytes));
399-
int major = in.readInt();
400-
int minor = in.readInt();
401-
StoreVersion version = new StoreVersion(major, minor);
400+
StoreVersion version = mapper.readValue(bytes, StoreVersion.class);
402401
if (version.major != CURRENT_VERSION.major) {
403402
throw new IOException("cannot read state DB with version " + version + ", incompatible " +
404403
"with current version " + CURRENT_VERSION);
@@ -408,23 +407,18 @@ private static void checkVersion(DB db) throws IOException {
408407
}
409408

410409
private static void storeVersion(DB db) throws IOException {
411-
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
412-
DataOutputStream out = new DataOutputStream(bytesOut);
413-
out.writeInt(CURRENT_VERSION.major);
414-
out.writeInt(CURRENT_VERSION.minor);
415-
out.close();
416-
db.put(StoreVersion.KEY, bytesOut.toByteArray());
410+
db.put(StoreVersion.KEY, mapper.writeValueAsBytes(CURRENT_VERSION));
417411
}
418412

419413

420-
private static class StoreVersion {
414+
public static class StoreVersion {
421415

422416
final static byte[] KEY = "StoreVersion".getBytes(Charsets.UTF_8);
423417

424-
final int major;
425-
final int minor;
418+
public final int major;
419+
public final int minor;
426420

427-
StoreVersion(int major, int minor) {
421+
@JsonCreator public StoreVersion(@JsonProperty("major") int major, @JsonProperty("minor") int minor) {
428422
this.major = major;
429423
this.minor = minor;
430424
}
@@ -436,10 +430,7 @@ public boolean equals(Object o) {
436430

437431
StoreVersion that = (StoreVersion) o;
438432

439-
if (major != that.major) return false;
440-
if (minor != that.minor) return false;
441-
442-
return true;
433+
return major == that.major && minor == that.minor;
443434
}
444435

445436
@Override
@@ -450,6 +441,4 @@ public int hashCode() {
450441
}
451442
}
452443

453-
454-
455444
}

network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.io.InputStream;
2222
import java.io.InputStreamReader;
2323

24+
import com.fasterxml.jackson.databind.ObjectMapper;
2425
import com.google.common.io.CharStreams;
2526
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
2627
import org.apache.spark.network.util.SystemPropertyConfigProvider;
@@ -131,27 +132,25 @@ public void testHashShuffleBlocks() throws IOException {
131132

132133
@Test
133134
public void jsonSerializationOfExecutorRegistration() throws IOException {
135+
ObjectMapper mapper = new ObjectMapper();
134136
AppExecId appId = new AppExecId("foo", "bar");
135-
String appIdJson = ExternalShuffleBlockResolver.mapper.writeValueAsString(appId);
136-
AppExecId parsedAppId =
137-
ExternalShuffleBlockResolver.mapper.readValue(appIdJson, AppExecId.class);
137+
String appIdJson = mapper.writeValueAsString(appId);
138+
AppExecId parsedAppId = mapper.readValue(appIdJson, AppExecId.class);
138139
assertEquals(parsedAppId, appId);
139140

140141
ExecutorShuffleInfo shuffleInfo =
141142
new ExecutorShuffleInfo(new String[]{"/bippy", "/flippy"}, 7, "hash");
142-
String shuffleJson = ExternalShuffleBlockResolver.mapper.writeValueAsString(shuffleInfo);
143+
String shuffleJson = mapper.writeValueAsString(shuffleInfo);
143144
ExecutorShuffleInfo parsedShuffleInfo =
144-
ExternalShuffleBlockResolver.mapper.readValue(shuffleJson, ExecutorShuffleInfo.class);
145+
mapper.readValue(shuffleJson, ExecutorShuffleInfo.class);
145146
assertEquals(parsedShuffleInfo, shuffleInfo);
146147

147148
// Intentionally keep these hard-coded strings in here, to check backwards-compatability.
148149
// its not legacy yet, but keeping this here in case anybody changes it
149150
String legacyAppIdJson = "{\"appId\":\"foo\", \"execId\":\"bar\"}";
150-
assertEquals(appId,
151-
ExternalShuffleBlockResolver.mapper.readValue(legacyAppIdJson, AppExecId.class));
151+
assertEquals(appId, mapper.readValue(legacyAppIdJson, AppExecId.class));
152152
String legacyShuffleJson = "{\"localDirs\": [\"/bippy\", \"/flippy\"], " +
153153
"\"subDirsPerLocalDir\": 7, \"shuffleManager\": \"hash\"}";
154-
assertEquals(shuffleInfo,
155-
ExternalShuffleBlockResolver.mapper.readValue(legacyShuffleJson, ExecutorShuffleInfo.class));
154+
assertEquals(shuffleInfo, mapper.readValue(legacyShuffleJson, ExecutorShuffleInfo.class));
156155
}
157156
}

yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,14 @@
1616
*/
1717
package org.apache.spark.network.yarn
1818

19-
import java.io.{DataOutputStream, FileOutputStream, PrintWriter, File}
19+
import java.io.{DataOutputStream, File, FileOutputStream}
2020

2121
import scala.annotation.tailrec
2222

2323
import org.apache.commons.io.FileUtils
2424
import org.apache.hadoop.yarn.api.records.ApplicationId
2525
import org.apache.hadoop.yarn.conf.YarnConfiguration
26-
import org.apache.hadoop.yarn.server.api.{ApplicationTerminationContext, ApplicationInitializationContext}
26+
import org.apache.hadoop.yarn.server.api.{ApplicationInitializationContext, ApplicationTerminationContext}
2727
import org.scalatest.{BeforeAndAfterEach, Matchers}
2828

2929
import org.apache.spark.SparkFunSuite
@@ -33,11 +33,10 @@ import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
3333
class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach {
3434
private[yarn] var yarnConfig: YarnConfiguration = new YarnConfiguration
3535

36-
3736
override def beforeEach(): Unit = {
3837
yarnConfig.set(YarnConfiguration.NM_AUX_SERVICES, "spark_shuffle")
3938
yarnConfig.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format("spark_shuffle"),
40-
classOf[YarnShuffleService].getCanonicalName);
39+
classOf[YarnShuffleService].getCanonicalName)
4140

4241
yarnConfig.get("yarn.nodemanager.local-dirs").split(",").foreach { dir =>
4342
val d = new File(dir)

0 commit comments

Comments
 (0)