Skip to content

Commit

Permalink
[SPARK-20642][CORE] Store FsHistoryProvider listing data in a KVStore.
Browse files Browse the repository at this point in the history
The application listing is still generated from event logs, but is now stored
in a KVStore instance. By default an in-memory store is used, but a new config
allows setting a local disk path to store the data, in which case a LevelDB
store will be created.

The provider stores things internally using the public REST API types; I believe
this is better going forward since it will make it easier to get rid of the
internal history server API which is mostly redundant at this point.

I also added a finalizer to LevelDBIterator, to make sure that resources are
eventually released. This helps when code iterates but does not exhaust the
iterator, thus not triggering the auto-close code.

HistoryServerSuite was modified to not re-start the history server unnecessarily;
this makes the json validation tests run more quickly.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #18887 from vanzin/SPARK-20642.
  • Loading branch information
Marcelo Vanzin authored and cloud-fan committed Sep 27, 2017
1 parent 9c5935d commit 74daf62
Show file tree
Hide file tree
Showing 10 changed files with 536 additions and 333 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -213,17 +213,32 @@ public long count(Class<?> type, String index, Object indexedValue) throws Excep

@Override
public void close() throws IOException {
DB _db = this._db.getAndSet(null);
if (_db == null) {
return;
synchronized (this._db) {
DB _db = this._db.getAndSet(null);
if (_db == null) {
return;
}

try {
_db.close();
} catch (IOException ioe) {
throw ioe;
} catch (Exception e) {
throw new IOException(e.getMessage(), e);
}
}
}

try {
_db.close();
} catch (IOException ioe) {
throw ioe;
} catch (Exception e) {
throw new IOException(e.getMessage(), e);
/**
* Closes the given iterator if the DB is still open. Trying to close a JNI LevelDB handle
* with a closed DB can cause JVM crashes, so this ensures that situation does not happen.
*/
void closeIterator(LevelDBIterator it) throws IOException {
synchronized (this._db) {
DB _db = this._db.get();
if (_db != null) {
it.close();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,16 @@ public synchronized void close() throws IOException {
}
}

/**
* Because it's tricky to expose closeable iterators through many internal APIs, especially
* when Scala wrappers are used, this makes sure that, hopefully, the JNI resources held by
* the iterator will eventually be released.
*/
@Override
protected void finalize() throws Throwable {
db.closeIterator(this);
}

private byte[] loadNext() {
if (count >= max) {
return null;
Expand Down
5 changes: 5 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@
<artifactId>spark-launcher_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-kvstore_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-network-common_${scala.binary.version}</artifactId>
Expand Down
Loading

0 comments on commit 74daf62

Please sign in to comment.