Skip to content

Commit

Permalink
HugeGraph-387: add async-task framework
Browse files Browse the repository at this point in the history
Change-Id: I452f3a1c9a7ccf1580075e9fc18456b1b4aae1c6
  • Loading branch information
javeme authored and Linary committed Aug 9, 2018
1 parent fed5f76 commit 5787597
Show file tree
Hide file tree
Showing 49 changed files with 1,308 additions and 210 deletions.
2 changes: 1 addition & 1 deletion hugegraph-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>hugegraph</artifactId>
<groupId>com.baidu.hugegraph</groupId>
<version>0.7.0-SNAPSHOT</version>
<version>0.7.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion hugegraph-cassandra/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>hugegraph</artifactId>
<groupId>com.baidu.hugegraph</groupId>
<version>0.7.0-SNAPSHOT</version>
<version>0.7.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ public class CassandraSessionPool extends BackendSessionPool {
private Cluster cluster;
private String keyspace;

public CassandraSessionPool(String keyspace) {
public CassandraSessionPool(String keyspace, String store) {
super(keyspace + "/" + store);
this.cluster = null;
this.keyspace = keyspace;
}
Expand Down Expand Up @@ -149,9 +150,6 @@ public final class Session extends BackendSession {
public Session() {
this.session = null;
this.batch = new BatchStatement(); // LOGGED
try {
this.open();
} catch (InvalidQueryException ignored) {}
}

public BatchStatement add(Statement statement) {
Expand Down Expand Up @@ -212,16 +210,24 @@ public ResultSet execute(String statement, Object... args) {
return this.session.execute(statement, args);
}

private void tryOpen() {
assert this.session == null;
try {
this.open();
} catch (InvalidQueryException ignored) {}
}

public void open() {
assert this.session == null;
this.session = cluster().connect(keyspace());
}

@Override
public boolean closed() {
if (this.session == null) {
return true;
this.tryOpen();
}
return this.session.isClosed();
return this.session == null ? true : this.session.isClosed();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import com.baidu.hugegraph.backend.store.BackendStoreProvider;
import com.baidu.hugegraph.config.HugeConfig;
import com.baidu.hugegraph.type.HugeType;
import com.baidu.hugegraph.type.define.Directions;
import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.util.Log;
import com.datastax.driver.core.Cluster;
Expand Down Expand Up @@ -73,7 +72,7 @@ public CassandraStore(final BackendStoreProvider provider,
this.keyspace = keyspace;
this.store = store;

this.sessions = new CassandraSessionPool(this.keyspace);
this.sessions = new CassandraSessionPool(keyspace, store);
this.tables = new ConcurrentHashMap<>();

this.conf = null;
Expand Down Expand Up @@ -495,17 +494,17 @@ public CassandraGraphStore(BackendStoreProvider provider,
super(provider, keyspace, store);

registerTableManager(HugeType.VERTEX,
new CassandraTables.Vertex());
new CassandraTables.Vertex(store));

registerTableManager(HugeType.EDGE_OUT,
new CassandraTables.Edge(Directions.OUT));
CassandraTables.Edge.out(store));
registerTableManager(HugeType.EDGE_IN,
new CassandraTables.Edge(Directions.IN));
CassandraTables.Edge.in(store));

registerTableManager(HugeType.SECONDARY_INDEX,
new CassandraTables.SecondaryIndex());
new CassandraTables.SecondaryIndex(store));
registerTableManager(HugeType.RANGE_INDEX,
new CassandraTables.RangeIndex());
new CassandraTables.RangeIndex(store));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ public void dropTable(CassandraSessionPool.Session session) {

protected void createIndex(CassandraSessionPool.Session session,
String indexLabel, HugeKeys column) {
String indexName = this.table() + "_" + indexLabel;
String indexName = joinTableName(this.table(), indexLabel);
SchemaStatement index = SchemaBuilder.createIndex(indexName)
.ifNotExists()
.onTable(this.table())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@

public class CassandraTables {

public static final String LABEL_INDEX = "label_index";
public static final String NAME_INDEX = "name_index";

private static final DataType DATATYPE_PK = DataType.cint();
private static final DataType DATATYPE_SL = DataType.cint(); // VL/EL
private static final DataType DATATYPE_IL = DataType.cint();
Expand Down Expand Up @@ -142,7 +145,7 @@ public void init(CassandraSessionPool.Session session) {
.build();

this.createTable(session, pkeys, ckeys, columns);
this.createIndex(session, "vertex_label_name_index", HugeKeys.NAME);
this.createIndex(session, NAME_INDEX, HugeKeys.NAME);
}
}

Expand Down Expand Up @@ -175,7 +178,7 @@ public void init(CassandraSessionPool.Session session) {
.build();

this.createTable(session, pkeys, ckeys, columns);
this.createIndex(session, "edge_label_name_index", HugeKeys.NAME);
this.createIndex(session, NAME_INDEX, HugeKeys.NAME);
}
}

Expand All @@ -202,7 +205,7 @@ public void init(CassandraSessionPool.Session session) {
);

this.createTable(session, pkeys, ckeys, columns);
this.createIndex(session, "property_key_name_index", HugeKeys.NAME);
this.createIndex(session, NAME_INDEX, HugeKeys.NAME);
}
}

Expand Down Expand Up @@ -230,16 +233,16 @@ public void init(CassandraSessionPool.Session session) {
);

this.createTable(session, pkeys, ckeys, columns);
this.createIndex(session, "index_label_name_index", HugeKeys.NAME);
this.createIndex(session, NAME_INDEX, HugeKeys.NAME);
}
}

public static class Vertex extends CassandraTable {

public static final String TABLE = "vertices";

public Vertex() {
super(TABLE);
public Vertex(String store) {
super(joinTableName(store, TABLE));
}

@Override
Expand All @@ -255,21 +258,31 @@ public void init(CassandraSessionPool.Session session) {
);

this.createTable(session, pkeys, ckeys, columns);
this.createIndex(session, "vertex_label_index", HugeKeys.LABEL);
this.createIndex(session, LABEL_INDEX, HugeKeys.LABEL);
}
}

public static class Edge extends CassandraTable {

public static final String TABLE_PREFIX = "edges";

private final String store;
private final Directions direction;

public Edge(Directions direction) {
super(table(direction));
protected Edge(String store, Directions direction) {
super(joinTableName(store, table(direction)));
this.store = store;
this.direction = direction;
}

protected String edgesTable(Directions direction) {
return joinTableName(this.store, table(direction));
}

protected Directions direction() {
return this.direction;
}

@Override
public void init(CassandraSessionPool.Session session) {
ImmutableMap<HugeKeys, DataType> pkeys = ImmutableMap.of(
Expand All @@ -293,7 +306,7 @@ public void init(CassandraSessionPool.Session session) {
* by label from out-edges table
*/
if (this.direction == Directions.OUT) {
this.createIndex(session, "edge_label_index", HugeKeys.LABEL);
this.createIndex(session, LABEL_INDEX, HugeKeys.LABEL);
}
}

Expand Down Expand Up @@ -395,9 +408,9 @@ protected void deleteEdgesByLabel(CassandraSessionPool.Session session,
}
}

private static Delete buildDelete(Id label, String ownerVertex,
Directions direction) {
Delete delete = QueryBuilder.delete().from(table(direction));
private Delete buildDelete(Id label, String ownerVertex,
Directions direction) {
Delete delete = QueryBuilder.delete().from(edgesTable(direction));
delete.where(formatEQ(HugeKeys.OWNER_VERTEX, ownerVertex));
delete.where(formatEQ(HugeKeys.DIRECTION, direction.code()));
delete.where(formatEQ(HugeKeys.LABEL, label.asLong()));
Expand Down Expand Up @@ -444,18 +457,26 @@ private CassandraBackendEntry wrapByVertex(CassandraBackendEntry edge) {
return vertex;
}

public static String table(Directions direction) {
private static String table(Directions direction) {
assert direction == Directions.OUT || direction == Directions.IN;
return TABLE_PREFIX + "_" + direction.string();
}

public static CassandraTable out(String store) {
return new Edge(store, Directions.OUT);
}

public static CassandraTable in(String store) {
return new Edge(store, Directions.IN);
}
}

public static class SecondaryIndex extends CassandraTable {

public static final String TABLE = "secondary_indexes";

public SecondaryIndex() {
super(TABLE);
public SecondaryIndex(String store) {
super(joinTableName(store, TABLE));
}

@Override
Expand Down Expand Up @@ -548,8 +569,8 @@ public static class RangeIndex extends CassandraTable {

public static final String TABLE = "range_indexes";

public RangeIndex() {
super(TABLE);
public RangeIndex(String store) {
super(joinTableName(store, TABLE));
}

@Override
Expand Down
4 changes: 2 additions & 2 deletions hugegraph-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>com.baidu.hugegraph</groupId>
<artifactId>hugegraph</artifactId>
<version>0.7.0-SNAPSHOT</version>
<version>0.7.1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>hugegraph-core</artifactId>
Expand Down Expand Up @@ -106,7 +106,7 @@
</manifest>
<manifestEntries>
<Implementation-Version>
0.7.0.0
0.7.1.0
</Implementation-Version>
</manifestEntries>
</archive>
Expand Down
Loading

0 comments on commit 5787597

Please sign in to comment.