Skip to content

Commit 544c60f

Browse files
authored
[IOTDB-4092] Protecting Region creation process by adding CreateRegionGroupsProcedure (apache#7006)
1 parent 81f5ca6 commit 544c60f

File tree

19 files changed

+538
-183
lines changed

19 files changed

+538
-183
lines changed

confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java

Lines changed: 45 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.iotdb.confignode.client.async.datanode;
2020

2121
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
22+
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
2223
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
2324
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
2425
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
@@ -50,6 +51,7 @@
5051
import org.slf4j.Logger;
5152
import org.slf4j.LoggerFactory;
5253

54+
import java.util.HashMap;
5355
import java.util.List;
5456
import java.util.Map;
5557
import java.util.concurrent.ConcurrentHashMap;
@@ -231,18 +233,18 @@ public void sendAsyncRequestToDataNode(
231233
}
232234

233235
/**
234-
* Execute CreateRegionsReq asynchronously
236+
* Execute CreateRegionGroupsPlan asynchronously
235237
*
236-
* @param createRegionGroupsPlan CreateRegionsReq
237238
* @param ttlMap Map<StorageGroupName, TTL>
239+
* @return Those RegionGroups that failed to create
238240
*/
239-
public void createRegions(
241+
public Map<TConsensusGroupId, TRegionReplicaSet> createRegionGroups(
240242
CreateRegionGroupsPlan createRegionGroupsPlan, Map<String, Long> ttlMap) {
241243
// Because different requests will be sent to the same node when createRegions,
242244
// so for CreateRegions use Map<index, TDataNodeLocation>
243245
Map<Integer, TDataNodeLocation> dataNodeLocationMap = new ConcurrentHashMap<>();
244246
int index = 0;
245-
// Count the datanodes to be sent
247+
// Count the DataNodes to be sent
246248
for (List<TRegionReplicaSet> regionReplicaSets :
247249
createRegionGroupsPlan.getRegionGroupMap().values()) {
248250
for (TRegionReplicaSet regionReplicaSet : regionReplicaSets) {
@@ -252,7 +254,7 @@ public void createRegions(
252254
}
253255
}
254256
if (dataNodeLocationMap.isEmpty()) {
255-
return;
257+
return new HashMap<>();
256258
}
257259
for (int retry = 0; retry < retryNum; retry++) {
258260
index = 0;
@@ -298,7 +300,7 @@ public void createRegions(
298300
retry);
299301
break;
300302
default:
301-
return;
303+
break;
302304
}
303305
} else {
304306
index++;
@@ -317,6 +319,43 @@ public void createRegions(
317319
break;
318320
}
319321
}
322+
323+
// Filter RegionGroups that weren't created successfully
324+
index = 0;
325+
Map<TConsensusGroupId, TRegionReplicaSet> failedRegions = new HashMap<>();
326+
for (List<TRegionReplicaSet> regionReplicaSets :
327+
createRegionGroupsPlan.getRegionGroupMap().values()) {
328+
for (TRegionReplicaSet regionReplicaSet : regionReplicaSets) {
329+
for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
330+
if (dataNodeLocationMap.containsKey(index)) {
331+
failedRegions
332+
.computeIfAbsent(
333+
regionReplicaSet.getRegionId(),
334+
empty -> new TRegionReplicaSet().setRegionId(regionReplicaSet.getRegionId()))
335+
.addToDataNodeLocations(dataNodeLocation);
336+
}
337+
index += 1;
338+
}
339+
}
340+
}
341+
return failedRegions;
342+
}
343+
344+
private TCreateSchemaRegionReq genCreateSchemaRegionReq(
345+
String storageGroup, TRegionReplicaSet regionReplicaSet) {
346+
TCreateSchemaRegionReq req = new TCreateSchemaRegionReq();
347+
req.setStorageGroup(storageGroup);
348+
req.setRegionReplicaSet(regionReplicaSet);
349+
return req;
350+
}
351+
352+
private TCreateDataRegionReq genCreateDataRegionReq(
353+
String storageGroup, TRegionReplicaSet regionReplicaSet, long TTL) {
354+
TCreateDataRegionReq req = new TCreateDataRegionReq();
355+
req.setStorageGroup(storageGroup);
356+
req.setRegionReplicaSet(regionReplicaSet);
357+
req.setTtl(TTL);
358+
return req;
320359
}
321360

322361
/**
@@ -341,23 +380,6 @@ public void broadCastTheLatestConfigNodeGroup(
341380
}
342381
}
343382

344-
private TCreateSchemaRegionReq genCreateSchemaRegionReq(
345-
String storageGroup, TRegionReplicaSet regionReplicaSet) {
346-
TCreateSchemaRegionReq req = new TCreateSchemaRegionReq();
347-
req.setStorageGroup(storageGroup);
348-
req.setRegionReplicaSet(regionReplicaSet);
349-
return req;
350-
}
351-
352-
private TCreateDataRegionReq genCreateDataRegionReq(
353-
String storageGroup, TRegionReplicaSet regionReplicaSet, long TTL) {
354-
TCreateDataRegionReq req = new TCreateDataRegionReq();
355-
req.setStorageGroup(storageGroup);
356-
req.setRegionReplicaSet(regionReplicaSet);
357-
req.setTtl(TTL);
358-
return req;
359-
}
360-
361383
/**
362384
* Always call this interface when a DataNode is restarted or removed
363385
*

confignode/src/main/java/org/apache/iotdb/confignode/client/sync/datanode/SyncDataNodeClientPool.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -134,14 +134,21 @@ private void deleteRegions(
134134
List<TConsensusGroupId> regionIds,
135135
Set<TRegionReplicaSet> deletedRegionSet) {
136136
for (TConsensusGroupId regionId : regionIds) {
137-
LOGGER.debug("Delete region {} ", regionId);
137+
LOGGER.info("Try to delete RegionReplica: {} on DataNode: {}", regionId, endPoint);
138138
final TSStatus status =
139139
sendSyncRequestToDataNodeWithRetry(
140140
endPoint, regionId, DataNodeRequestType.DELETE_REGIONS);
141+
141142
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
142-
LOGGER.info("DELETE Region {} successfully", regionId);
143-
deletedRegionSet.removeIf(k -> k.getRegionId().equals(regionId));
143+
LOGGER.info("Delete RegionReplica: {} on DataNode: {} successfully", regionId, endPoint);
144+
} else {
145+
LOGGER.warn(
146+
"Failed to delete RegionReplica: {} on DataNode: {}. You might need to delete it manually",
147+
regionId,
148+
endPoint);
144149
}
150+
151+
deletedRegionSet.removeIf(k -> k.getRegionId().equals(regionId));
145152
}
146153
}
147154

confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
import org.apache.iotdb.confignode.consensus.request.write.CreateRegionGroupsPlan;
4141
import org.apache.iotdb.confignode.consensus.request.write.CreateSchemaPartitionPlan;
4242
import org.apache.iotdb.confignode.consensus.request.write.DeleteProcedurePlan;
43-
import org.apache.iotdb.confignode.consensus.request.write.DeleteRegionsPlan;
43+
import org.apache.iotdb.confignode.consensus.request.write.DeleteRegionGroupsPlan;
4444
import org.apache.iotdb.confignode.consensus.request.write.DeleteStorageGroupPlan;
4545
import org.apache.iotdb.confignode.consensus.request.write.DropFunctionPlan;
4646
import org.apache.iotdb.confignode.consensus.request.write.PreDeleteStorageGroupPlan;
@@ -143,8 +143,8 @@ public static ConfigPhysicalPlan create(ByteBuffer buffer) throws IOException {
143143
case CreateRegionGroups:
144144
req = new CreateRegionGroupsPlan();
145145
break;
146-
case DeleteRegions:
147-
req = new DeleteRegionsPlan();
146+
case DeleteRegionGroups:
147+
req = new DeleteRegionGroupsPlan();
148148
break;
149149
case GetSchemaPartition:
150150
req = new GetSchemaPartitionPlan();

confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public enum ConfigPhysicalPlanType {
3232
GetStorageGroup,
3333
CountStorageGroup,
3434
CreateRegionGroups,
35-
DeleteRegions,
35+
DeleteRegionGroups,
3636
GetSchemaPartition,
3737
CreateSchemaPartition,
3838
GetOrCreateSchemaPartition,

confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateRegionGroupsPlan.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,21 +28,26 @@
2828
import java.io.IOException;
2929
import java.nio.ByteBuffer;
3030
import java.util.ArrayList;
31+
import java.util.HashMap;
3132
import java.util.List;
3233
import java.util.Map;
3334
import java.util.Map.Entry;
3435
import java.util.Objects;
35-
import java.util.TreeMap;
3636

3737
/** Create regions for specific StorageGroups */
3838
public class CreateRegionGroupsPlan extends ConfigPhysicalPlan {
3939

4040
// Map<StorageGroupName, List<TRegionReplicaSet>>
41-
private final Map<String, List<TRegionReplicaSet>> regionGroupMap;
41+
protected final Map<String, List<TRegionReplicaSet>> regionGroupMap;
4242

4343
public CreateRegionGroupsPlan() {
4444
super(ConfigPhysicalPlanType.CreateRegionGroups);
45-
this.regionGroupMap = new TreeMap<>();
45+
this.regionGroupMap = new HashMap<>();
46+
}
47+
48+
public CreateRegionGroupsPlan(ConfigPhysicalPlanType type) {
49+
super(type);
50+
this.regionGroupMap = new HashMap<>();
4651
}
4752

4853
public Map<String, List<TRegionReplicaSet>> getRegionGroupMap() {

confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/DeleteRegionsPlan.java renamed to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/DeleteRegionGroupsPlan.java

Lines changed: 23 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,62 +18,60 @@
1818
*/
1919
package org.apache.iotdb.confignode.consensus.request.write;
2020

21-
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
21+
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
2222
import org.apache.iotdb.commons.utils.BasicStructureSerDeUtil;
2323
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
24-
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
2524
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
2625

2726
import java.io.DataOutputStream;
2827
import java.io.IOException;
2928
import java.nio.ByteBuffer;
3029
import java.util.ArrayList;
31-
import java.util.HashMap;
3230
import java.util.List;
3331
import java.util.Map;
3432
import java.util.Objects;
3533

36-
public class DeleteRegionsPlan extends ConfigPhysicalPlan {
34+
public class DeleteRegionGroupsPlan extends CreateRegionGroupsPlan {
3735

38-
private final Map<String, List<TConsensusGroupId>> deleteRegionMap;
36+
boolean needsDeleteInPartitionTable = true;
3937

40-
public DeleteRegionsPlan() {
41-
super(ConfigPhysicalPlanType.DeleteRegions);
42-
this.deleteRegionMap = new HashMap<>();
38+
public DeleteRegionGroupsPlan() {
39+
super(ConfigPhysicalPlanType.DeleteRegionGroups);
4340
}
4441

45-
public void addDeleteRegion(String name, TConsensusGroupId consensusGroupId) {
46-
deleteRegionMap.computeIfAbsent(name, empty -> new ArrayList<>()).add(consensusGroupId);
42+
public boolean isNeedsDeleteInPartitionTable() {
43+
return needsDeleteInPartitionTable;
4744
}
4845

49-
public Map<String, List<TConsensusGroupId>> getDeleteRegionMap() {
50-
return deleteRegionMap;
46+
public void setNeedsDeleteInPartitionTable(boolean needsDeleteInPartitionTable) {
47+
this.needsDeleteInPartitionTable = needsDeleteInPartitionTable;
5148
}
5249

5350
@Override
5451
protected void serializeImpl(DataOutputStream stream) throws IOException {
55-
stream.writeInt(ConfigPhysicalPlanType.DeleteRegions.ordinal());
52+
stream.writeInt(ConfigPhysicalPlanType.DeleteRegionGroups.ordinal());
5653

57-
stream.writeInt(deleteRegionMap.size());
58-
for (Map.Entry<String, List<TConsensusGroupId>> consensusGroupIdsEntry :
59-
deleteRegionMap.entrySet()) {
60-
BasicStructureSerDeUtil.write(consensusGroupIdsEntry.getKey(), stream);
61-
stream.writeInt(consensusGroupIdsEntry.getValue().size());
62-
for (TConsensusGroupId consensusGroupId : consensusGroupIdsEntry.getValue()) {
63-
ThriftCommonsSerDeUtils.serializeTConsensusGroupId(consensusGroupId, stream);
54+
stream.writeByte(needsDeleteInPartitionTable ? 1 : 0);
55+
stream.writeInt(regionGroupMap.size());
56+
for (Map.Entry<String, List<TRegionReplicaSet>> entry : regionGroupMap.entrySet()) {
57+
BasicStructureSerDeUtil.write(entry.getKey(), stream);
58+
stream.writeInt(entry.getValue().size());
59+
for (TRegionReplicaSet regionReplicaSet : entry.getValue()) {
60+
ThriftCommonsSerDeUtils.serializeTRegionReplicaSet(regionReplicaSet, stream);
6461
}
6562
}
6663
}
6764

6865
@Override
6966
protected void deserializeImpl(ByteBuffer buffer) throws IOException {
67+
needsDeleteInPartitionTable = buffer.get() > 0;
7068
int length = buffer.getInt();
7169
for (int i = 0; i < length; i++) {
7270
String name = BasicStructureSerDeUtil.readString(buffer);
73-
deleteRegionMap.put(name, new ArrayList<>());
71+
regionGroupMap.put(name, new ArrayList<>());
7472
int regionNum = buffer.getInt();
7573
for (int j = 0; j < regionNum; j++) {
76-
deleteRegionMap.get(name).add(ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(buffer));
74+
regionGroupMap.get(name).add(ThriftCommonsSerDeUtils.deserializeTRegionReplicaSet(buffer));
7775
}
7876
}
7977
}
@@ -82,12 +80,12 @@ protected void deserializeImpl(ByteBuffer buffer) throws IOException {
8280
public boolean equals(Object o) {
8381
if (this == o) return true;
8482
if (o == null || getClass() != o.getClass()) return false;
85-
DeleteRegionsPlan that = (DeleteRegionsPlan) o;
86-
return deleteRegionMap.equals(that.deleteRegionMap);
83+
DeleteRegionGroupsPlan that = (DeleteRegionGroupsPlan) o;
84+
return regionGroupMap.equals(that.regionGroupMap);
8785
}
8886

8987
@Override
9088
public int hashCode() {
91-
return Objects.hash(deleteRegionMap);
89+
return Objects.hash(regionGroupMap);
9290
}
9391
}

confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -616,22 +616,7 @@ public Object getOrCreateDataPartition(
616616
}
617617

618618
private TSStatus confirmLeader() {
619-
TSStatus result = new TSStatus();
620-
621-
if (getConsensusManager().isLeader()) {
622-
return result.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
623-
} else {
624-
result.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
625-
result.setMessage(
626-
"The current ConfigNode is not leader, please redirect to a new ConfigNode.");
627-
628-
TConfigNodeLocation leaderLocation = consensusManager.getLeader();
629-
if (leaderLocation != null) {
630-
result.setRedirectNode(leaderLocation.getInternalEndPoint());
631-
}
632-
633-
return result;
634-
}
619+
return getConsensusManager().confirmLeader();
635620
}
636621

637622
@Override

confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
2222
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
23+
import org.apache.iotdb.common.rpc.thrift.TSStatus;
2324
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
2425
import org.apache.iotdb.commons.consensus.PartitionRegionId;
2526
import org.apache.iotdb.commons.exception.BadNodeUrlException;
@@ -36,6 +37,7 @@
3637
import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
3738
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
3839
import org.apache.iotdb.consensus.config.ConsensusConfig;
40+
import org.apache.iotdb.rpc.TSStatusCode;
3941

4042
import org.slf4j.Logger;
4143
import org.slf4j.LoggerFactory;
@@ -202,6 +204,30 @@ public TConfigNodeLocation getLeader() {
202204
return null;
203205
}
204206

207+
/**
208+
* Confirm the current ConfigNode's leadership
209+
*
210+
* @return SUCCESS_STATUS if the current ConfigNode is leader, NEED_REDIRECTION otherwise
211+
*/
212+
public TSStatus confirmLeader() {
213+
TSStatus result = new TSStatus();
214+
215+
if (isLeader()) {
216+
return result.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
217+
} else {
218+
result.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
219+
result.setMessage(
220+
"The current ConfigNode is not leader, please redirect to a new ConfigNode.");
221+
222+
TConfigNodeLocation leaderLocation = getLeader();
223+
if (leaderLocation != null) {
224+
result.setRedirectNode(leaderLocation.getInternalEndPoint());
225+
}
226+
227+
return result;
228+
}
229+
}
230+
205231
public ConsensusGroupId getConsensusGroupId() {
206232
return consensusGroupId;
207233
}

0 commit comments

Comments
 (0)