Skip to content

Commit

Permalink
Update cache from raft StateMachine (apache#1119)
Browse files Browse the repository at this point in the history
Change-Id: I743de35eb35409ebf4ff2507564cfe2c82c79fe9
  • Loading branch information
Linary authored Jul 29, 2020
1 parent 5f60a29 commit 2df4f68
Show file tree
Hide file tree
Showing 32 changed files with 217 additions and 137 deletions.
2 changes: 1 addition & 1 deletion hugegraph-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>hugegraph</artifactId>
<groupId>com.baidu.hugegraph</groupId>
<version>0.11.1</version>
<version>0.11.2</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion hugegraph-cassandra/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>hugegraph</artifactId>
<groupId>com.baidu.hugegraph</groupId>
<version>0.11.1</version>
<version>0.11.2</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
4 changes: 2 additions & 2 deletions hugegraph-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>com.baidu.hugegraph</groupId>
<artifactId>hugegraph</artifactId>
<version>0.11.1</version>
<version>0.11.2</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>hugegraph-core</artifactId>
Expand Down Expand Up @@ -193,7 +193,7 @@
</addDefaultSpecificationEntries>
</manifest>
<manifestEntries>
<Implementation-Version>0.11.1.0</Implementation-Version>
<Implementation-Version>0.11.2.0</Implementation-Version>
</manifestEntries>
</archive>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ private GraphTransaction graphTransaction() {
}

private BackendStoreProvider loadStoreProvider() {
return BackendProviderFactory.open(this.configuration);
return BackendProviderFactory.open(this.params);
}

private AbstractSerializer serializer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,16 +135,21 @@ private void listenChanges() {
this.cacheEventListener = event -> {
LOG.debug("Graph {} received graph cache event: {}",
this.graph(), event);
event.checkArgs(String.class, Id.class);
event.checkArgs(String.class, HugeType.class, Id.class);
Object[] args = event.args();
if ("invalid".equals(args[0])) {
Id id = (Id) args[1];
if (this.verticesCache.get(id) != null) {
HugeType type = (HugeType) args[1];
Id id = (Id) args[2];
if (type.isVertex()) {
// Invalidate vertex cache
this.verticesCache.invalidate(id);
} else if (this.edgesCache.get(id) != null) {
// Invalidate edge cache
this.edgesCache.invalidate(id);
} else if (type.isEdge()) {
/*
* Invalidate edge cache via clear instead of invalidate
* because of the cacheKey is QueryId not EdgeId
*/
// this.edgesCache.invalidate(id);
this.edgesCache.clear();
}
return true;
} else if ("clear".equals(args[0])) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,12 @@ private void listenChanges() {
this.cacheEventListener = event -> {
LOG.debug("Graph {} received schema cache event: {}",
this.graph(), event);
event.checkArgs(String.class, Id.class);
event.checkArgs(String.class, HugeType.class, Id.class);
Object[] args = event.args();
if ("invalid".equals(args[0])) {
Id id = (Id) args[1];
HugeType type = (HugeType) args[1];
Id id = (Id) args[2];
id = generateId(type, id);
Object value = this.idCache.get(id);
if (value != null) {
// Invalidate id cache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ public BinaryId id() {
return this.id;
}

@Override
public Id originId() {
return this.id.origin();
}

@Override
public Id subId() {
return this.subId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,11 @@ public Id id() {
return this.row.id;
}

@Override
public Id originId() {
return this.row.id;
}

public void id(Id id) {
this.row.id = id;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ public Id id() {
return this.id;
}

@Override
public Id originId() {
return this.id;
}

@Override
public Id subId() {
return this.subId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public boolean equals(Object obj) {
@Override
public Id id();

public Id originId();

public Id subId();

public long ttl();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import org.slf4j.Logger;

import com.baidu.hugegraph.HugeGraphParams;
import com.baidu.hugegraph.backend.BackendException;
import com.baidu.hugegraph.backend.store.memory.InMemoryDBStoreProvider;
import com.baidu.hugegraph.backend.store.raft.RaftBackendStoreProvider;
Expand All @@ -41,12 +42,26 @@ public class BackendProviderFactory {
providers = new ConcurrentHashMap<>();
}

public static BackendStoreProvider open(HugeConfig config) {
String backend = config.get(CoreOptions.BACKEND);
public static BackendStoreProvider open(HugeGraphParams params) {
HugeConfig config = params.configuration();
String backend = config.get(CoreOptions.BACKEND).toLowerCase();
String graph = config.get(CoreOptions.STORE);
boolean raftMode = config.get(CoreOptions.RAFT_MODE);

BackendStoreProvider provider = newProvider(config);
if (raftMode) {
LOG.info("Opening backend store '{}' in raft mode for graph '{}'",
backend, graph);
provider = new RaftBackendStoreProvider(provider, params);
}
provider.open(graph);
return provider;
}

private static BackendStoreProvider newProvider(HugeConfig config) {
String backend = config.get(CoreOptions.BACKEND).toLowerCase();
String graph = config.get(CoreOptions.STORE);

backend = backend.toLowerCase();
if (InMemoryDBStoreProvider.matchType(backend)) {
return InMemoryDBStoreProvider.instance(graph);
}
Expand All @@ -67,19 +82,7 @@ public static BackendStoreProvider open(HugeConfig config) {
"BackendStoreProvider with type '%s' " +
"can't be opened by key '%s'",
instance.type(), backend);

BackendStoreProvider provider;
if (raftMode) {
LOG.info("Opening backend store '{}' in raft mode for graph '{}'",
backend, graph);
provider = new RaftBackendStoreProvider(instance, config);
} else {
LOG.info("Opening backend store '{}' for graph '{}'",
backend, graph);
provider = instance;
}
provider.open(graph);
return provider;
return instance;
}

@SuppressWarnings({ "rawtypes", "unchecked" })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import com.baidu.hugegraph.backend.store.BackendMutation;
import com.baidu.hugegraph.backend.store.BackendStore;
import com.baidu.hugegraph.backend.store.BackendStoreProvider;
import com.baidu.hugegraph.config.CoreOptions;
import com.baidu.hugegraph.config.HugeConfig;
import com.baidu.hugegraph.type.HugeType;
import com.baidu.hugegraph.util.Log;
Expand Down Expand Up @@ -209,10 +208,11 @@ private Object submitAndWait(StoreCommand command) {
}

private Object queryByRaft(Query query, Function<Object, Object> func) {
if (!this.context.config().get(CoreOptions.RAFT_SAFE_READ)) {
if (!this.context.isSafeRead()) {
return func.apply(query);
}

this.node().waitLeaderElected();
StoreCommand command = new StoreCommand(StoreAction.QUERY);
StoreClosure closure = new StoreClosure(command);
RaftNode raftNode = this.node();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@

import com.alipay.sofa.jraft.rpc.RpcServer;
import com.baidu.hugegraph.HugeGraph;
import com.baidu.hugegraph.HugeGraphParams;
import com.baidu.hugegraph.backend.store.BackendStore;
import com.baidu.hugegraph.backend.store.BackendStoreProvider;
import com.baidu.hugegraph.config.HugeConfig;
import com.baidu.hugegraph.event.EventListener;
import com.baidu.hugegraph.util.Log;

Expand All @@ -40,9 +40,9 @@ public class RaftBackendStoreProvider implements BackendStoreProvider {
private RaftBackendStore systemStore;

public RaftBackendStoreProvider(BackendStoreProvider provider,
HugeConfig config) {
HugeGraphParams params) {
this.provider = provider;
this.context = new RaftSharedContext(config);
this.context = new RaftSharedContext(params);
this.schemaStore = null;
this.graphStore = null;
this.systemStore = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,37 +23,29 @@
import static com.baidu.hugegraph.backend.store.raft.RaftSharedContext.WAIT_LEADER_TIMEOUT;
import static com.baidu.hugegraph.backend.store.raft.RaftSharedContext.WAIT_RPC_TIMEOUT;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Paths;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;

import com.alipay.sofa.jraft.Node;
import com.alipay.sofa.jraft.RaftGroupService;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.conf.Configuration;
import com.alipay.sofa.jraft.core.NodeImpl;
import com.alipay.sofa.jraft.core.Replicator.ReplicatorStateListener;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.entity.Task;
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.option.NodeOptions;
import com.alipay.sofa.jraft.option.RaftOptions;
import com.alipay.sofa.jraft.rpc.ClientService;
import com.alipay.sofa.jraft.rpc.RpcResponseClosure;
import com.baidu.hugegraph.HugeException;
import com.baidu.hugegraph.backend.BackendException;
import com.baidu.hugegraph.backend.store.BackendStore;
import com.baidu.hugegraph.backend.store.raft.RaftRequests.StoreAction;
import com.baidu.hugegraph.backend.store.raft.RaftRequests.StoreCommandRequest;
import com.baidu.hugegraph.backend.store.raft.RaftRequests.StoreCommandResponse;
import com.baidu.hugegraph.config.CoreOptions;
import com.baidu.hugegraph.config.HugeConfig;
import com.baidu.hugegraph.util.CodeUtil;
import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.util.Log;
Expand Down Expand Up @@ -93,75 +85,22 @@ public Node node() {

private Node initRaftNode(BackendStore store, RaftSharedContext context)
throws IOException {
HugeConfig config = context.config();
String storePath = store.database() + "-" + store.store();
// TODO: When support sharding, groupId needs to be bound to shard Id
String groupId = storePath;
PeerId serverId = new PeerId();
serverId.parse(config.get(CoreOptions.RAFT_ENDPOINT));

NodeOptions nodeOptions = this.initNodeOptions(config);
NodeOptions nodeOptions = context.nodeOptions(storePath);
nodeOptions.setFsm(this.stateMachine);

String raftPath = config.get(CoreOptions.RAFT_PATH);
String logUri = Paths.get(raftPath, "log", storePath).toString();
FileUtils.forceMkdir(new File(logUri));
nodeOptions.setLogUri(logUri);

String metaUri = Paths.get(raftPath, "meta", storePath).toString();
FileUtils.forceMkdir(new File(metaUri));
nodeOptions.setRaftMetaUri(metaUri);

if (config.get(CoreOptions.RAFT_USE_SNAPSHOT)) {
String snapshotUri = Paths.get(raftPath, "snapshot", storePath)
.toString();
FileUtils.forceMkdir(new File(snapshotUri));
nodeOptions.setSnapshotUri(snapshotUri);
}

RaftOptions raftOptions = nodeOptions.getRaftOptions();
/*
* NOTE: if buffer size is too small(<=1024), will throw exception
* "LogManager is busy, disk queue overload"
*/
int queueSize = config.get(CoreOptions.RAFT_QUEUE_SIZE);
raftOptions.setDisruptorBufferSize(queueSize);
// raftOptions.setReplicatorPipeline(false);
// nodeOptions.setRpcProcessorThreadPoolSize(48);
// nodeOptions.setEnableMetrics(false);
// TODO: When support sharding, groupId needs to be bound to shard Id
String groupId = storePath;
PeerId endpoint = context.endpoint();

RaftGroupService raftGroupService;
// Shared rpc server
raftGroupService = new RaftGroupService(groupId, serverId, nodeOptions,
raftGroupService = new RaftGroupService(groupId, endpoint, nodeOptions,
context.rpcServer(), true);
// Start node
return raftGroupService.start(false);
}

private NodeOptions initNodeOptions(HugeConfig config) {
final NodeOptions nodeOptions = new NodeOptions();
int electionTimeout = config.get(CoreOptions.RAFT_ELECTION_TIMEOUT);
nodeOptions.setElectionTimeoutMs(electionTimeout);
nodeOptions.setDisableCli(false);

int snapshotInterval = config.get(CoreOptions.RAFT_SNAPSHOT_INTERVAL);
nodeOptions.setSnapshotIntervalSecs(snapshotInterval);

PeerId serverId = new PeerId();
String serverIdStr = config.get(CoreOptions.RAFT_ENDPOINT);
if (!serverId.parse(serverIdStr)) {
throw new HugeException("Failed to parse serverId %s", serverIdStr);
}

Configuration initConf = new Configuration();
String initConfStr = config.get(CoreOptions.RAFT_GROUP_PEERS);
if (!initConf.parse(initConfStr)) {
throw new HugeException("Failed to parse initConf %s", initConfStr);
}
nodeOptions.setInitialConf(initConf);
return nodeOptions;
}

private void submitCommand(StoreCommand command, StoreClosure closure) {
// Wait leader elected
this.waitLeaderElected();
Expand Down Expand Up @@ -199,7 +138,7 @@ public Object submitAndWait(StoreCommand command, StoreClosure closure) {
}
}

private void waitLeaderElected() {
protected void waitLeaderElected() {
if (this.node.getLeaderId() != null) {
return;
}
Expand Down
Loading

0 comments on commit 2df4f68

Please sign in to comment.