2020package org .apache .iotdb .confignode .procedure .env ;
2121
2222import org .apache .iotdb .common .rpc .thrift .TConsensusGroupId ;
23+ import org .apache .iotdb .common .rpc .thrift .TConsensusGroupType ;
2324import org .apache .iotdb .common .rpc .thrift .TDataNodeConfiguration ;
2425import org .apache .iotdb .common .rpc .thrift .TDataNodeLocation ;
2526import org .apache .iotdb .common .rpc .thrift .TRegionReplicaSet ;
3738import org .apache .iotdb .confignode .consensus .request .write .datanode .RemoveDataNodePlan ;
3839import org .apache .iotdb .confignode .consensus .response .datanode .DataNodeToStatusResp ;
3940import org .apache .iotdb .confignode .manager .ConfigManager ;
41+ import org .apache .iotdb .confignode .manager .load .balancer .region .GreedyCopySetRegionGroupAllocator ;
42+ import org .apache .iotdb .confignode .manager .load .balancer .region .IRegionGroupAllocator ;
4043import org .apache .iotdb .confignode .manager .load .cache .node .NodeHeartbeatSample ;
4144import org .apache .iotdb .confignode .manager .load .cache .region .RegionHeartbeatSample ;
4245import org .apache .iotdb .confignode .manager .partition .PartitionMetrics ;
5154
5255import java .util .ArrayList ;
5356import java .util .Collections ;
57+ import java .util .HashMap ;
58+ import java .util .HashSet ;
5459import java .util .List ;
5560import java .util .Map ;
5661import java .util .Set ;
5762import java .util .TreeMap ;
63+ import java .util .function .Function ;
5864import java .util .stream .Collectors ;
5965
6066import static org .apache .iotdb .confignode .conf .ConfigNodeConstant .REMOVE_DATANODE_PROCESS ;
@@ -70,8 +76,22 @@ public class RemoveDataNodeHandler {
7076
7177 private final ConfigManager configManager ;
7278
79+ private final IRegionGroupAllocator regionGroupAllocator ;
80+
7381 public RemoveDataNodeHandler (ConfigManager configManager ) {
7482 this .configManager = configManager ;
83+
84+ switch (ConfigNodeDescriptor .getInstance ().getConf ().getRegionGroupAllocatePolicy ()) {
85+ case GREEDY :
86+ this .regionGroupAllocator = new GreedyCopySetRegionGroupAllocator ();
87+ break ;
88+ case PGR :
89+ this .regionGroupAllocator = new GreedyCopySetRegionGroupAllocator ();
90+ break ;
91+ case GCR :
92+ default :
93+ this .regionGroupAllocator = new GreedyCopySetRegionGroupAllocator ();
94+ }
7595 }
7696
7797 /**
@@ -193,6 +213,172 @@ public List<RegionMigrationPlan> getRegionMigrationPlans(
193213 return regionMigrationPlans ;
194214 }
195215
216+ /**
217+ * Retrieves all region migration plans for the specified removed DataNodes and selects the
218+ * destination.
219+ *
220+ * @param removedDataNodes the list of DataNodes from which to obtain migration plans
221+ * @return a list of region migration plans associated with the removed DataNodes
222+ */
223+ public List <RegionMigrationPlan > selectedRegionMigrationPlans (
224+ List <TDataNodeLocation > removedDataNodes ) {
225+
226+ Set <Integer > removedDataNodesSet = new HashSet <>();
227+ for (TDataNodeLocation removedDataNode : removedDataNodes ) {
228+ removedDataNodesSet .add (removedDataNode .dataNodeId );
229+ }
230+
231+ final List <TDataNodeConfiguration > availableDataNodes =
232+ configManager
233+ .getNodeManager ()
234+ .filterDataNodeThroughStatus (NodeStatus .Running , NodeStatus .Unknown )
235+ .stream ()
236+ .filter (node -> !removedDataNodesSet .contains (node .getLocation ().getDataNodeId ()))
237+ .collect (Collectors .toList ());
238+
239+ List <RegionMigrationPlan > regionMigrationPlans = new ArrayList <>();
240+
241+ regionMigrationPlans .addAll (
242+ selectMigrationPlans (availableDataNodes , TConsensusGroupType .DataRegion , removedDataNodes ));
243+
244+ regionMigrationPlans .addAll (
245+ selectMigrationPlans (
246+ availableDataNodes , TConsensusGroupType .SchemaRegion , removedDataNodes ));
247+
248+ return regionMigrationPlans ;
249+ }
250+
251+ public List <RegionMigrationPlan > selectMigrationPlans (
252+ List <TDataNodeConfiguration > availableDataNodes ,
253+ TConsensusGroupType consensusGroupType ,
254+ List <TDataNodeLocation > removedDataNodes ) {
255+
256+ // Retrieve all allocated replica sets for the given consensus group type
257+ List <TRegionReplicaSet > allocatedReplicaSets =
258+ configManager .getPartitionManager ().getAllReplicaSets (consensusGroupType );
259+
260+ // Step 1: Identify affected replica sets and record the removed DataNode for each replica set
261+ Map <TConsensusGroupId , TDataNodeLocation > removedNodeMap = new HashMap <>();
262+ Set <TRegionReplicaSet > affectedReplicaSets =
263+ identifyAffectedReplicaSets (allocatedReplicaSets , removedDataNodes , removedNodeMap );
264+
265+ // Step 2: Update affected replica sets by removing the removed DataNode
266+ updateReplicaSets (allocatedReplicaSets , affectedReplicaSets , removedNodeMap );
267+
268+ // Build a mapping of available DataNodes and their free disk space (computed only once)
269+ Map <Integer , TDataNodeConfiguration > availableDataNodeMap =
270+ buildAvailableDataNodeMap (availableDataNodes );
271+ Map <Integer , Double > freeDiskSpaceMap = buildFreeDiskSpaceMap (availableDataNodes );
272+
273+ // Step 3: For each affected replica set, select a new destination DataNode and create a
274+ // migration plan
275+ List <RegionMigrationPlan > migrationPlans = new ArrayList <>();
276+
277+ Map <TConsensusGroupId , TRegionReplicaSet > remainReplicasMap = new HashMap <>();
278+ Map <TConsensusGroupId , String > regionDatabaseMap = new HashMap <>();
279+ Map <String , List <TRegionReplicaSet >> databaseAllocatedRegionGroupMap = new HashMap <>();
280+
281+ for (TRegionReplicaSet replicaSet : affectedReplicaSets ) {
282+ remainReplicasMap .put (replicaSet .getRegionId (), replicaSet );
283+ String database =
284+ configManager .getPartitionManager ().getRegionDatabase (replicaSet .getRegionId ());
285+ List <TRegionReplicaSet > databaseAllocatedReplicaSets =
286+ configManager .getPartitionManager ().getAllReplicaSets (database , consensusGroupType );
287+ regionDatabaseMap .put (replicaSet .getRegionId (), database );
288+ databaseAllocatedRegionGroupMap .put (database , databaseAllocatedReplicaSets );
289+ }
290+
291+ Map <TConsensusGroupId , TDataNodeConfiguration > result =
292+ regionGroupAllocator .removeNodeReplicaSelect (
293+ availableDataNodeMap ,
294+ freeDiskSpaceMap ,
295+ allocatedReplicaSets ,
296+ regionDatabaseMap ,
297+ databaseAllocatedRegionGroupMap ,
298+ remainReplicasMap );
299+
300+ for (TConsensusGroupId regionId : result .keySet ()) {
301+
302+ TDataNodeConfiguration selectedNode = result .get (regionId );
303+ LOGGER .info (
304+ "Selected DataNode {} for Region {}" ,
305+ selectedNode .getLocation ().getDataNodeId (),
306+ regionId );
307+
308+ // Create the migration plan
309+ RegionMigrationPlan plan = RegionMigrationPlan .create (regionId , removedNodeMap .get (regionId ));
310+ plan .setToDataNode (selectedNode .getLocation ());
311+ migrationPlans .add (plan );
312+ }
313+ return migrationPlans ;
314+ }
315+
316+ /**
317+ * Identifies affected replica sets from allocatedReplicaSets that contain any DataNode in
318+ * removedDataNodes, and records the removed DataNode for each replica set.
319+ */
320+ private Set <TRegionReplicaSet > identifyAffectedReplicaSets (
321+ List <TRegionReplicaSet > allocatedReplicaSets ,
322+ List <TDataNodeLocation > removedDataNodes ,
323+ Map <TConsensusGroupId , TDataNodeLocation > removedNodeMap ) {
324+
325+ Set <TRegionReplicaSet > affectedReplicaSets = new HashSet <>();
326+ // Create a copy of allocatedReplicaSets to avoid concurrent modifications
327+ List <TRegionReplicaSet > allocatedCopy = new ArrayList <>(allocatedReplicaSets );
328+
329+ for (TDataNodeLocation removedNode : removedDataNodes ) {
330+ allocatedCopy .stream ()
331+ .filter (replicaSet -> replicaSet .getDataNodeLocations ().contains (removedNode ))
332+ .forEach (
333+ replicaSet -> {
334+ removedNodeMap .put (replicaSet .getRegionId (), removedNode );
335+ affectedReplicaSets .add (replicaSet );
336+ });
337+ }
338+ return affectedReplicaSets ;
339+ }
340+
341+ /**
342+ * Updates each affected replica set by removing the removed DataNode from its list. The
343+ * allocatedReplicaSets list is updated accordingly.
344+ */
345+ private void updateReplicaSets (
346+ List <TRegionReplicaSet > allocatedReplicaSets ,
347+ Set <TRegionReplicaSet > affectedReplicaSets ,
348+ Map <TConsensusGroupId , TDataNodeLocation > removedNodeMap ) {
349+ for (TRegionReplicaSet replicaSet : affectedReplicaSets ) {
350+ // Remove the replica set, update its node list, then re-add it
351+ allocatedReplicaSets .remove (replicaSet );
352+ replicaSet .getDataNodeLocations ().remove (removedNodeMap .get (replicaSet .getRegionId ()));
353+ allocatedReplicaSets .add (replicaSet );
354+ }
355+ }
356+
357+ /**
358+ * Constructs a mapping from DataNodeId to TDataNodeConfiguration from the available DataNodes.
359+ */
360+ private Map <Integer , TDataNodeConfiguration > buildAvailableDataNodeMap (
361+ List <TDataNodeConfiguration > availableDataNodes ) {
362+ return availableDataNodes .stream ()
363+ .collect (
364+ Collectors .toMap (
365+ dataNode -> dataNode .getLocation ().getDataNodeId (), Function .identity ()));
366+ }
367+
368+ /** Constructs a mapping of free disk space for each DataNode. */
369+ private Map <Integer , Double > buildFreeDiskSpaceMap (
370+ List <TDataNodeConfiguration > availableDataNodes ) {
371+ Map <Integer , Double > freeDiskSpaceMap = new HashMap <>(availableDataNodes .size ());
372+ availableDataNodes .forEach (
373+ dataNode ->
374+ freeDiskSpaceMap .put (
375+ dataNode .getLocation ().getDataNodeId (),
376+ configManager
377+ .getLoadManager ()
378+ .getFreeDiskSpace (dataNode .getLocation ().getDataNodeId ())));
379+ return freeDiskSpaceMap ;
380+ }
381+
196382 /**
197383 * Broadcasts DataNodes' status change, preventing disabled DataNodes from accepting read or write
198384 * requests.
0 commit comments