Skip to content

Commit 1a7980b

Browse files
committed
version
1 parent 8267d2a commit 1a7980b

File tree

1 file changed

+77
-6
lines changed

1 file changed

+77
-6
lines changed

network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java

Lines changed: 77 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,8 @@ public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorF
117117

118118
}
119119
}
120+
// if there is a version mismatch, we throw an exception, which means the service is unusable
121+
checkVersion(tmpDb);
120122
executors = reloadRegisteredExecutors(tmpDb);
121123
db = tmpDb;
122124
} else {
@@ -332,26 +334,31 @@ public String toString() {
332334
}
333335

334336
private static byte[] dbAppExecKey(AppExecId appExecId) {
335-
return (appExecId.appId + ";" + appExecId.execId).getBytes(Charsets.UTF_8);
337+
return (APP_KEY_PREFIX + ";" + appExecId.appId + ";" + appExecId.execId).getBytes(Charsets.UTF_8);
336338
}
337339

338-
private static AppExecId parseDbAppExecKey(byte[] bytes) {
339-
String s = new String(bytes, Charsets.UTF_8);
340+
private static AppExecId parseDbAppExecKey(String s) {
340341
int p = s.indexOf(';');
341-
return new AppExecId(s.substring(0, p), s.substring(p + 1));
342+
int p2 = s.indexOf(';', p + 1);
343+
return new AppExecId(s.substring(p + 1, p2), s.substring(p2 + 1));
342344
}
343345

346+
private static final String APP_KEY_PREFIX = "AppExecShuffleInfo";
347+
344348
@VisibleForTesting
345349
static ConcurrentMap<AppExecId, ExecutorShuffleInfo> reloadRegisteredExecutors(DB db)
346350
throws IOException {
347351
ConcurrentMap<AppExecId, ExecutorShuffleInfo> registeredExecutors = Maps.newConcurrentMap();
348352
if (db != null) {
349353
DBIterator itr = db.iterator();
350-
itr.seekToFirst();
354+
itr.seek(APP_KEY_PREFIX.getBytes(Charsets.UTF_8));
351355
while (itr.hasNext()) {
352356
Map.Entry<byte[], byte[]> e = itr.next();
357+
String key = new String(e.getKey(), Charsets.UTF_8);
358+
if (!key.startsWith(APP_KEY_PREFIX))
359+
break;
360+
AppExecId id = parseDbAppExecKey(key);
353361
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(e.getValue()));
354-
AppExecId id = parseDbAppExecKey(e.getKey());
355362
try {
356363
registeredExecutors.put(
357364
id,
@@ -375,5 +382,69 @@ public void log(String message) {
375382
}
376383
}
377384

385+
private static final StoreVersion CURRENT_VERSION = new StoreVersion(1,0);
386+
private static void checkVersion(DB db) throws IOException {
387+
byte[] bytes = db.get(StoreVersion.KEY);
388+
if (bytes == null) {
389+
storeVersion(db);
390+
} else if (bytes.length != 8) {
391+
throw new IOException("unexpected version format");
392+
} else {
393+
DataInputStream in = new DataInputStream(new ByteArrayInputStream(bytes));
394+
int major = in.readInt();
395+
int minor = in.readInt();
396+
StoreVersion version = new StoreVersion(major, minor);
397+
if (version.major != CURRENT_VERSION.major) {
398+
throw new IOException("cannot read state DB with version " + version + ", incompatible " +
399+
"with current version " + CURRENT_VERSION);
400+
}
401+
storeVersion(db);
402+
}
403+
}
404+
405+
private static void storeVersion(DB db) throws IOException {
406+
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
407+
DataOutputStream out = new DataOutputStream(bytesOut);
408+
out.writeInt(CURRENT_VERSION.major);
409+
out.writeInt(CURRENT_VERSION.minor);
410+
out.close();
411+
db.put(StoreVersion.KEY, bytesOut.toByteArray());
412+
}
413+
414+
415+
private static class StoreVersion {
416+
417+
final static byte[] KEY = "StoreVersion".getBytes(Charsets.UTF_8);
418+
419+
final int major;
420+
final int minor;
421+
422+
StoreVersion(int major, int minor) {
423+
this.major = major;
424+
this.minor = minor;
425+
}
426+
427+
@Override
428+
public boolean equals(Object o) {
429+
if (this == o) return true;
430+
if (o == null || getClass() != o.getClass()) return false;
431+
432+
StoreVersion that = (StoreVersion) o;
433+
434+
if (major != that.major) return false;
435+
if (minor != that.minor) return false;
436+
437+
return true;
438+
}
439+
440+
@Override
441+
public int hashCode() {
442+
int result = major;
443+
result = 31 * result + minor;
444+
return result;
445+
}
446+
}
447+
448+
378449

379450
}

0 commit comments

Comments
 (0)