Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix cross-close the underlying rocksdb session pool #598

Merged
merged 3 commits into from
Jul 9, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
fix cross-close the underlying rocksdb session pool
fix: #597
Change-Id: I8b185cd7f81a9a04bc6fd971490ae887fd4ddbb5
  • Loading branch information
javeme committed Jul 3, 2019
commit d4230ce449c521ab13d131655831cfef1f5fd634
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,7 @@ public Transaction tx() {

@Override
public void close() throws HugeException {
LOG.info("Close graph {}", this);
this.taskManager.closeScheduler(this);
try {
this.closeTx();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ public RocksDBSessions(HugeConfig config, String database, String store) {

public abstract String property(String property);

public abstract RocksDBSessions copy(HugeConfig config,
String database, String store);

@Override
public abstract Session session();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter;
Expand Down Expand Up @@ -63,13 +64,14 @@

public class RocksDBStdSessions extends RocksDBSessions {

private final Map<String, ColumnFamilyHandle> cfs = new HashMap<>();

private final RocksDB rocksdb;
private final SstFileManager sstFileManager;

public RocksDBStdSessions(HugeConfig config, String dataPath,
String walPath, String database, String store)
private final Map<String, ColumnFamilyHandle> cfs;
private final AtomicInteger refCount;

public RocksDBStdSessions(HugeConfig config, String database, String store,
String dataPath,String walPath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add an empty between args

throws RocksDBException {
super(config, database, store);

Expand All @@ -86,10 +88,13 @@ public RocksDBStdSessions(HugeConfig config, String dataPath,
* Don't merge old CFs, we expect a clear DB when using this one
*/
this.rocksdb = RocksDB.open(options, dataPath);

this.cfs = new HashMap<>();
this.refCount = new AtomicInteger(1);
}

public RocksDBStdSessions(HugeConfig config, String dataPath,
String walPath, String database, String store,
public RocksDBStdSessions(HugeConfig config, String database, String store,
String dataPath, String walPath,
List<String> cfNames) throws RocksDBException {
super(config, database, store);

Expand Down Expand Up @@ -121,13 +126,28 @@ public RocksDBStdSessions(HugeConfig config, String dataPath,
"Expect same size of cf-handles and cf-names");

// Collect CF Handles
this.cfs = new HashMap<>();
for (int i = 0; i < cfs.size(); i++) {
this.cfs.put(cfs.get(i), cfhs.get(i));
}

this.refCount = new AtomicInteger(1);

ingestExternalFile();
}

private RocksDBStdSessions(HugeConfig config, String database, String store,
RocksDBStdSessions origin) {
super(config, database, store);

this.rocksdb = origin.rocksdb;
this.sstFileManager = origin.sstFileManager;
this.cfs = origin.cfs;
this.refCount = origin.refCount;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.refCount = origin.refCount.incrementAndGet();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They have to share the address of refCount


this.refCount.incrementAndGet();
}

@Override
public void open() throws Exception {
// pass
Expand Down Expand Up @@ -182,6 +202,12 @@ public String property(String property) {
}
}

@Override
public RocksDBSessions copy(HugeConfig config,
String database, String store) {
return new RocksDBStdSessions(config, database, store, this);
}

@Override
public final Session session() {
return (Session) super.getOrNewSession();
Expand All @@ -198,6 +224,11 @@ protected final Session newSession() {
protected synchronized void doClose() {
this.checkValid();

if (this.refCount.decrementAndGet() > 0) {
return;
}
assert this.refCount.get() == 0;

for (ColumnFamilyHandle cf : this.cfs.values()) {
cf.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public RocksDBStore(final BackendStoreProvider provider,
private void registerMetaHandlers() {
this.registerMetaHandler("metrics", (session, meta, args) -> {
List<RocksDBSessions> dbs = new ArrayList<>();
dbs.add(sessions);
dbs.add(this.sessions);
dbs.addAll(tableDBMapping().values());

RocksDBMetrics metrics = new RocksDBMetrics(dbs, session);
Expand Down Expand Up @@ -181,10 +181,11 @@ protected RocksDBSessions open(HugeConfig config, String dataPath,
sessions = this.openSessionPool(config, dataPath,
walPath, tableNames);
} catch (RocksDBException e) {
if (dbs.containsKey(dataPath)) {
RocksDBSessions origin = dbs.get(dataPath);
if (origin != null) {
if (e.getMessage().contains("No locks available")) {
// Open twice, but we should support keyspace
sessions = dbs.get(dataPath);
sessions = origin.copy(config, this.database, this.store);
}
}

Expand Down Expand Up @@ -222,6 +223,7 @@ protected RocksDBSessions open(HugeConfig config, String dataPath,
}

if (sessions != null) {
// May override the original session pool
dbs.put(dataPath, sessions);
sessions.session();
LOG.debug("Store opened: {}", dataPath);
Expand All @@ -235,12 +237,11 @@ protected RocksDBSessions openSessionPool(HugeConfig config,
List<String> tableNames)
throws RocksDBException {
if (tableNames == null) {
return new RocksDBStdSessions(config, dataPath, walPath,
this.database, this.store);
return new RocksDBStdSessions(config, this.database, this.store,
dataPath, walPath);
} else {
return new RocksDBStdSessions(config, dataPath, walPath,
this.database, this.store,
tableNames);
return new RocksDBStdSessions(config, this.database, this.store,
dataPath, walPath, tableNames);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ public class RocksDBSstSessions extends RocksDBSessions {
private final String dataPath;
private final Map<String, SstFileWriter> tables;

public RocksDBSstSessions(HugeConfig config, String dataPath,
String database, String store) {
public RocksDBSstSessions(HugeConfig config, String database, String store,
String dataPath) {
super(config, database, store);

this.dataPath = dataPath;
Expand All @@ -71,6 +71,14 @@ public RocksDBSstSessions(HugeConfig config, String dataPath,
}
}

private RocksDBSstSessions(HugeConfig config, String database, String store,
RocksDBSstSessions origin) {
super(config, database, store);

this.dataPath = origin.dataPath;
this.tables = origin.tables;
}

@Override
public void open() throws Exception {
// pass
Expand Down Expand Up @@ -110,6 +118,13 @@ public String property(String property) {
throw new NotSupportException("RocksDBSstStore property()");
}

@Override
public RocksDBSessions copy(HugeConfig config,
String database, String store) {
return new RocksDBSstSessions(config, database, store, this);
}


private SstFileWriter table(String table) {
SstFileWriter sst = this.tables.get(table);
if (sst == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ protected RocksDBSessions openSessionPool(HugeConfig config,
List<String> tableNames)
throws RocksDBException {
if (tableNames == null) {
return new RocksDBSstSessions(config, dataPath, this.database(),
this.store());
return new RocksDBSstSessions(config, this.database(),
this.store(), dataPath);
} else {
return new RocksDBSstSessions(config, dataPath, this.database(),
this.store(), tableNames);
return new RocksDBSstSessions(config, this.database(), this.store(),
dataPath, tableNames);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,8 @@ public void testAddVertexWithCustomizeNumberIdStrategy() {
.create();
graph.addVertex(T.label, "programmer", T.id, 123456, "name", "marko",
"age", 18, "city", "Beijing");
graph.addVertex(T.label, "programmer", T.id, 61695499031416832L,
"name", "marko", "age", 18, "city", "Beijing");
graph.tx().commit();

List<Vertex> vertices = graph.traversal().V(123456).toList();
Expand All @@ -707,6 +709,16 @@ public void testAddVertexWithCustomizeNumberIdStrategy() {
assertContains(vertices,
T.label, "programmer", "name", "marko",
"age", 18, "city", "Beijing");

vertices = graph.traversal().V(61695499031416832L).toList();
Assert.assertEquals(1, vertices.size());
id = vertices.get(0).id();
Assert.assertEquals(IdGenerator.LongId.class, id.getClass());
Assert.assertEquals(61695499031416832L,
((IdGenerator.LongId) id).asLong());
assertContains(vertices,
T.label, "programmer", "name", "marko",
"age", 18, "city", "Beijing");
}

@Test
Expand Down