Skip to content

Commit 1cfb04b

Browse files
authored
[IOTDB-4138] Refactor consensus api and add javadoc (apache#7002)
1 parent 23cb64c commit 1cfb04b

File tree

14 files changed

+110
-59
lines changed

14 files changed

+110
-59
lines changed

confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ public void addConsensusGroup(List<TConfigNodeLocation> configNodeLocations) {
125125
for (TConfigNodeLocation configNodeLocation : configNodeLocations) {
126126
peerList.add(new Peer(consensusGroupId, configNodeLocation.getConsensusEndPoint()));
127127
}
128-
consensusImpl.addConsensusGroup(consensusGroupId, peerList);
128+
consensusImpl.createPeer(consensusGroupId, peerList);
129129
}
130130

131131
/**

confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -465,7 +465,7 @@ public TSStatus removeConsensusGroup(TConfigNodeLocation configNodeLocation) thr
465465

466466
ConsensusGroupId groupId = configManager.getConsensusManager().getConsensusGroupId();
467467
ConsensusGenericResponse resp =
468-
configManager.getConsensusManager().getConsensusImpl().removeConsensusGroup(groupId);
468+
configManager.getConsensusManager().getConsensusImpl().deletePeer(groupId);
469469
if (!resp.isSuccess()) {
470470
return new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_FAILED.getStatusCode())
471471
.setMessage(

consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,15 +44,66 @@ public interface IConsensus {
4444
ConsensusReadResponse read(ConsensusGroupId groupId, IConsensusRequest IConsensusRequest);
4545

4646
// multi consensus group API
47-
ConsensusGenericResponse addConsensusGroup(ConsensusGroupId groupId, List<Peer> peers);
4847

49-
ConsensusGenericResponse removeConsensusGroup(ConsensusGroupId groupId);
48+
/**
49+
* Require the <em>local node</em> to create a Peer and become a member of the given consensus
50+
* group. This node will prepare and initialize local statemachine {@link IStateMachine} and other
51+
* data structures. After this method returns, we can call {@link #addPeer(ConsensusGroupId,
52+
* Peer)} to notify original group that this new Peer is prepared to be added into the latest
53+
* configuration. createPeer should be called on a node that does not contain any peer of the
54+
* consensus group, to avoid one node having more than one replica.
55+
*
56+
* @param groupId the consensus group this Peer belongs
57+
* @param peers other known peers in this group
58+
*/
59+
ConsensusGenericResponse createPeer(ConsensusGroupId groupId, List<Peer> peers);
60+
61+
/**
62+
* When the <em>local node</em> is no longer a member of the given consensus group, call this
63+
* method to do cleanup works. This method will close local statemachine {@link IStateMachine},
64+
* delete local data and do other cleanup works. Be sure this method is called after successfully
65+
* removing this peer from current consensus group configuration (by calling {@link
66+
* #removePeer(ConsensusGroupId, Peer)} or {@link #changePeer(ConsensusGroupId, List)}).
67+
*
68+
* @param groupId the consensus group this Peer used to belong
69+
*/
70+
ConsensusGenericResponse deletePeer(ConsensusGroupId groupId);
5071

5172
// single consensus group API
73+
74+
/**
75+
* Tell the group that a new Peer is prepared to be added into this group. Call {@link
76+
* #createPeer(ConsensusGroupId, List)} on the new Peer before calling this method. When this
77+
* method returns, the group data should be already transmitted to the new Peer. That is, the new
78+
* peer is available to answer client requests by the time this method successfully returns.
79+
* addPeer should be called on a living peer of the consensus group. For example: We'd like to add
80+
* a peer D to (A, B, C) group. We need to execute addPeer in A, B or C.
81+
*
82+
* @param groupId the consensus group this peer belongs
83+
* @param peer the newly added peer
84+
*/
5285
ConsensusGenericResponse addPeer(ConsensusGroupId groupId, Peer peer);
5386

87+
/**
88+
* Tell the group to remove an active Peer. The removed peer can no longer answer group requests
89+
* when this method successfully returns. Call {@link #deletePeer(ConsensusGroupId)} on the
90+
* removed Peer to do cleanup jobs after this method successfully returns. removePeer should be
91+
* called on a living peer of its consensus group. For example: a group has A, B, C. We'd like to
92+
* remove C, in case C is dead, the removePeer should be sent to A or B.
93+
*
94+
* @param groupId the consensus group this peer belongs
95+
* @param peer the peer to be removed
96+
*/
5497
ConsensusGenericResponse removePeer(ConsensusGroupId groupId, Peer peer);
5598

99+
/**
100+
* Change group configuration. This method allows you to add/remove multiple Peers at once. This
101+
* method is similar to {@link #addPeer(ConsensusGroupId, Peer)} or {@link
102+
* #removePeer(ConsensusGroupId, Peer)}
103+
*
104+
* @param groupId the consensus group
105+
* @param newPeers the new member configuration of this group
106+
*/
56107
ConsensusGenericResponse changePeer(ConsensusGroupId groupId, List<Peer> newPeers);
57108

58109
// management API

consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ public ConsensusReadResponse read(ConsensusGroupId groupId, IConsensusRequest re
162162
}
163163

164164
@Override
165-
public ConsensusGenericResponse addConsensusGroup(ConsensusGroupId groupId, List<Peer> peers) {
165+
public ConsensusGenericResponse createPeer(ConsensusGroupId groupId, List<Peer> peers) {
166166
int consensusGroupSize = peers.size();
167167
if (consensusGroupSize == 0) {
168168
return ConsensusGenericResponse.newBuilder()
@@ -204,7 +204,7 @@ public ConsensusGenericResponse addConsensusGroup(ConsensusGroupId groupId, List
204204
}
205205

206206
@Override
207-
public ConsensusGenericResponse removeConsensusGroup(ConsensusGroupId groupId) {
207+
public ConsensusGenericResponse deletePeer(ConsensusGroupId groupId) {
208208
AtomicBoolean exist = new AtomicBoolean(false);
209209
stateMachineMap.computeIfPresent(
210210
groupId,

consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ public ConsensusReadResponse read(
250250
* register self to the RaftGroup
251251
*/
252252
@Override
253-
public ConsensusGenericResponse addConsensusGroup(ConsensusGroupId groupId, List<Peer> peers) {
253+
public ConsensusGenericResponse createPeer(ConsensusGroupId groupId, List<Peer> peers) {
254254
RaftGroup group = buildRaftGroup(groupId, peers);
255255
// pre-conditions: myself in this new group
256256
if (!group.getPeers().contains(myself)) {
@@ -285,7 +285,7 @@ public ConsensusGenericResponse addConsensusGroup(ConsensusGroupId groupId, List
285285
* clean up
286286
*/
287287
@Override
288-
public ConsensusGenericResponse removeConsensusGroup(ConsensusGroupId groupId) {
288+
public ConsensusGenericResponse deletePeer(ConsensusGroupId groupId) {
289289
RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
290290
RaftGroup raftGroup = getGroupInfo(raftGroupId);
291291

consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ public ConsensusReadResponse read(ConsensusGroupId groupId, IConsensusRequest re
136136
}
137137

138138
@Override
139-
public ConsensusGenericResponse addConsensusGroup(ConsensusGroupId groupId, List<Peer> peers) {
139+
public ConsensusGenericResponse createPeer(ConsensusGroupId groupId, List<Peer> peers) {
140140
int consensusGroupSize = peers.size();
141141
if (consensusGroupSize != 1) {
142142
return ConsensusGenericResponse.newBuilder()
@@ -172,7 +172,7 @@ public ConsensusGenericResponse addConsensusGroup(ConsensusGroupId groupId, List
172172
}
173173

174174
@Override
175-
public ConsensusGenericResponse removeConsensusGroup(ConsensusGroupId groupId) {
175+
public ConsensusGenericResponse deletePeer(ConsensusGroupId groupId) {
176176
AtomicBoolean exist = new AtomicBoolean(false);
177177
stateMachineMap.computeIfPresent(
178178
groupId,

consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -116,9 +116,9 @@ private void stopServer() {
116116
@Test
117117
public void ReplicateUsingQueueTest() throws IOException, InterruptedException {
118118
logger.info("Start ReplicateUsingQueueTest");
119-
servers.get(0).addConsensusGroup(group.getGroupId(), group.getPeers());
120-
servers.get(1).addConsensusGroup(group.getGroupId(), group.getPeers());
121-
servers.get(2).addConsensusGroup(group.getGroupId(), group.getPeers());
119+
servers.get(0).createPeer(group.getGroupId(), group.getPeers());
120+
servers.get(1).createPeer(group.getGroupId(), group.getPeers());
121+
servers.get(2).createPeer(group.getGroupId(), group.getPeers());
122122

123123
Assert.assertEquals(0, servers.get(0).getImpl(gid).getIndex());
124124
Assert.assertEquals(0, servers.get(1).getImpl(gid).getIndex());
@@ -205,8 +205,8 @@ public void ReplicateUsingQueueTest() throws IOException, InterruptedException {
205205
@Test
206206
public void ReplicateUsingWALTest() throws IOException, InterruptedException {
207207
logger.info("Start ReplicateUsingWALTest");
208-
servers.get(0).addConsensusGroup(group.getGroupId(), group.getPeers());
209-
servers.get(1).addConsensusGroup(group.getGroupId(), group.getPeers());
208+
servers.get(0).createPeer(group.getGroupId(), group.getPeers());
209+
servers.get(1).createPeer(group.getGroupId(), group.getPeers());
210210

211211
Assert.assertEquals(0, servers.get(0).getImpl(gid).getIndex());
212212
Assert.assertEquals(0, servers.get(1).getImpl(gid).getIndex());
@@ -224,7 +224,7 @@ public void ReplicateUsingWALTest() throws IOException, InterruptedException {
224224
stopServer();
225225
initServer();
226226

227-
servers.get(2).addConsensusGroup(group.getGroupId(), group.getPeers());
227+
servers.get(2).createPeer(group.getGroupId(), group.getPeers());
228228

229229
Assert.assertEquals(peers, servers.get(0).getImpl(gid).getConfiguration());
230230
Assert.assertEquals(peers, servers.get(1).getImpl(gid).getConfiguration());

consensus/src/test/java/org/apache/iotdb/consensus/multileader/RecoveryTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,19 +75,19 @@ public void tearDown() throws IOException {
7575

7676
@Test
7777
public void recoveryTest() throws Exception {
78-
consensusImpl.addConsensusGroup(
78+
consensusImpl.createPeer(
7979
schemaRegionId,
8080
Collections.singletonList(new Peer(schemaRegionId, new TEndPoint("0.0.0.0", 9000))));
8181

82-
consensusImpl.removeConsensusGroup(schemaRegionId);
82+
consensusImpl.deletePeer(schemaRegionId);
8383

8484
consensusImpl.stop();
8585
consensusImpl = null;
8686

8787
constructConsensus();
8888

8989
ConsensusGenericResponse response =
90-
consensusImpl.addConsensusGroup(
90+
consensusImpl.createPeer(
9191
schemaRegionId,
9292
Collections.singletonList(new Peer(schemaRegionId, new TEndPoint("0.0.0.0", 9000))));
9393

consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -122,9 +122,9 @@ public void tearDown() throws IOException {
122122

123123
@Test
124124
public void basicConsensus3Copy() throws Exception {
125-
servers.get(0).addConsensusGroup(group.getGroupId(), group.getPeers());
126-
servers.get(1).addConsensusGroup(group.getGroupId(), group.getPeers());
127-
servers.get(2).addConsensusGroup(group.getGroupId(), group.getPeers());
125+
servers.get(0).createPeer(group.getGroupId(), group.getPeers());
126+
servers.get(1).createPeer(group.getGroupId(), group.getPeers());
127+
servers.get(2).createPeer(group.getGroupId(), group.getPeers());
128128

129129
doConsensus(servers.get(0), group.getGroupId(), 10, 10);
130130
}
@@ -133,14 +133,14 @@ public void basicConsensus3Copy() throws Exception {
133133
public void addMemberToGroup() throws Exception {
134134
List<Peer> original = peers.subList(0, 1);
135135

136-
servers.get(0).addConsensusGroup(group.getGroupId(), original);
136+
servers.get(0).createPeer(group.getGroupId(), original);
137137
doConsensus(servers.get(0), group.getGroupId(), 10, 10);
138138

139139
// add 2 members
140-
servers.get(1).addConsensusGroup(group.getGroupId(), peers);
140+
servers.get(1).createPeer(group.getGroupId(), peers);
141141
servers.get(0).addPeer(group.getGroupId(), peers.get(1));
142142

143-
servers.get(2).addConsensusGroup(group.getGroupId(), peers);
143+
servers.get(2).createPeer(group.getGroupId(), peers);
144144
servers.get(0).changePeer(group.getGroupId(), peers);
145145

146146
Assert.assertEquals(stateMachines.get(0).getConfiguration().size(), 3);
@@ -149,26 +149,26 @@ public void addMemberToGroup() throws Exception {
149149

150150
@Test
151151
public void removeMemberFromGroup() throws Exception {
152-
servers.get(0).addConsensusGroup(group.getGroupId(), group.getPeers());
153-
servers.get(1).addConsensusGroup(group.getGroupId(), group.getPeers());
154-
servers.get(2).addConsensusGroup(group.getGroupId(), group.getPeers());
152+
servers.get(0).createPeer(group.getGroupId(), group.getPeers());
153+
servers.get(1).createPeer(group.getGroupId(), group.getPeers());
154+
servers.get(2).createPeer(group.getGroupId(), group.getPeers());
155155

156156
doConsensus(servers.get(0), group.getGroupId(), 10, 10);
157157

158158
servers.get(0).transferLeader(gid, peers.get(0));
159159
servers.get(0).removePeer(gid, peers.get(1));
160-
servers.get(1).removeConsensusGroup(gid);
160+
servers.get(1).deletePeer(gid);
161161
servers.get(0).removePeer(gid, peers.get(2));
162-
servers.get(2).removeConsensusGroup(gid);
162+
servers.get(2).deletePeer(gid);
163163

164164
doConsensus(servers.get(0), group.getGroupId(), 10, 20);
165165
}
166166

167167
@Test
168168
public void crashAndStart() throws Exception {
169-
servers.get(0).addConsensusGroup(group.getGroupId(), group.getPeers());
170-
servers.get(1).addConsensusGroup(group.getGroupId(), group.getPeers());
171-
servers.get(2).addConsensusGroup(group.getGroupId(), group.getPeers());
169+
servers.get(0).createPeer(group.getGroupId(), group.getPeers());
170+
servers.get(1).createPeer(group.getGroupId(), group.getPeers());
171+
servers.get(2).createPeer(group.getGroupId(), group.getPeers());
172172

173173
// 200 operation will trigger snapshot & purge
174174
doConsensus(servers.get(0), group.getGroupId(), 200, 200);
@@ -179,9 +179,9 @@ public void crashAndStart() throws Exception {
179179
servers.clear();
180180

181181
makeServers();
182-
servers.get(0).addConsensusGroup(group.getGroupId(), group.getPeers());
183-
servers.get(1).addConsensusGroup(group.getGroupId(), group.getPeers());
184-
servers.get(2).addConsensusGroup(group.getGroupId(), group.getPeers());
182+
servers.get(0).createPeer(group.getGroupId(), group.getPeers());
183+
servers.get(1).createPeer(group.getGroupId(), group.getPeers());
184+
servers.get(2).createPeer(group.getGroupId(), group.getPeers());
185185
doConsensus(servers.get(0), gid, 10, 210);
186186
}
187187

consensus/src/test/java/org/apache/iotdb/consensus/standalone/RecoveryTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public void tearDown() throws IOException {
7474

7575
@Test
7676
public void recoveryTest() throws Exception {
77-
consensusImpl.addConsensusGroup(
77+
consensusImpl.createPeer(
7878
schemaRegionId,
7979
Collections.singletonList(new Peer(schemaRegionId, new TEndPoint("0.0.0.0", 9000))));
8080

@@ -84,7 +84,7 @@ public void recoveryTest() throws Exception {
8484
constructConsensus();
8585

8686
ConsensusGenericResponse response =
87-
consensusImpl.addConsensusGroup(
87+
consensusImpl.createPeer(
8888
schemaRegionId,
8989
Collections.singletonList(new Peer(schemaRegionId, new TEndPoint("0.0.0.0", 9000))));
9090

0 commit comments

Comments
 (0)