Skip to content

Commit 94d312c

Browse files
committed
add multi databse support
1 parent 6102225 commit 94d312c

File tree

3 files changed

+135
-15
lines changed

3 files changed

+135
-15
lines changed

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java

Lines changed: 129 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ public class GreedyCopySetRegionGroupAllocator implements IRegionGroupAllocator
5353
private int[] databaseRegionCounter;
5454
// The number of 2-Region combinations in current cluster
5555
private int[][] combinationCounter;
56+
// The initial load for each database on each datanode
57+
private Map<String, int[]> initialDbLoad;
5658

5759
// First Key: the sum of Regions at the DataNodes in the allocation result is minimal
5860
int optimalRegionSum;
@@ -139,6 +141,7 @@ public Map<TConsensusGroupId, TDataNodeConfiguration> removeNodeReplicaSelect(
139141
List<TRegionReplicaSet> databaseAllocatedRegionGroups =
140142
new ArrayList<>(databaseAllocatedRegionGroupMap.values()).get(0);
141143
prepare(availableDataNodeMap, allocatedRegionGroups, databaseAllocatedRegionGroups);
144+
computeInitialDbLoad(databaseAllocatedRegionGroupMap);
142145

143146
// 2. Build allowed candidate set for each region that needs to be migrated.
144147
// For each region in remainReplicasMap, the candidate destination nodes are all nodes in
@@ -198,7 +201,8 @@ public Map<TConsensusGroupId, TDataNodeConfiguration> removeNodeReplicaSelect(
198201
additionalLoad,
199202
optimalAssignments,
200203
bestMetrics,
201-
remainReplicasMap);
204+
remainReplicasMap,
205+
regionDatabaseMap);
202206

203207
// 4. Randomly select one solution from the candidate buffer
204208
if (optimalAssignments.isEmpty()) {
@@ -259,7 +263,8 @@ private void dfsRemoveNodeReplica(
259263
int[] additionalLoad,
260264
List<int[]> optimalAssignments,
261265
int[] bestMetrics,
262-
Map<TConsensusGroupId, TRegionReplicaSet> remainReplicasMap) {
266+
Map<TConsensusGroupId, TRegionReplicaSet> remainReplicasMap,
267+
Map<TConsensusGroupId, String> regionDatabaseMap) {
263268
int n = regionKeys.size();
264269
if (index == n) {
265270
// A complete assignment has been generated.
@@ -281,7 +286,9 @@ private void dfsRemoveNodeReplica(
281286

282287
// Compute the maximum global load and maximum database load among all nodes that received
283288
// additional load.
284-
int[] currentMetrics = getCurrentMetrics(additionalLoad, currentScatter);
289+
int[] currentMetrics =
290+
getCurrentMetrics(
291+
additionalLoad, currentScatter, regionKeys, regionDatabaseMap, currentAssignment);
285292

286293
// Lexicographically compare currentMetrics with bestMetrics.
287294
// If currentMetrics is better than bestMetrics, update bestMetrics and clear the candidate
@@ -328,24 +335,133 @@ private void dfsRemoveNodeReplica(
328335
additionalLoad,
329336
optimalAssignments,
330337
bestMetrics,
331-
remainReplicasMap);
338+
remainReplicasMap,
339+
regionDatabaseMap);
332340
// Backtrack
333341
additionalLoad[candidate]--;
334342
}
335343
}
336344

337-
private int[] getCurrentMetrics(int[] additionalLoad, int currentScatter) {
338-
int currentMaxLoad = 0;
339-
int currentMaxDBLoad = 0;
340-
for (int nodeId = 0; nodeId < additionalLoad.length; nodeId++) {
341-
if (additionalLoad[nodeId] > 0) {
342-
currentMaxLoad = Math.max(currentMaxLoad, regionCounter[nodeId] + additionalLoad[nodeId]);
343-
currentMaxDBLoad =
344-
Math.max(currentMaxDBLoad, databaseRegionCounter[nodeId] + additionalLoad[nodeId]);
345+
/**
346+
* Computes the squared sum of the maximum load for each database.
347+
*
348+
* <p>For each database, this method calculates the maximum load on any data node by summing the
349+
* initial load (from {@code initialDbLoad}) with the additional load assigned during migration
350+
* (accumulated in {@code currentAssignment}), and then squares this maximum load. Finally, it
351+
* returns the sum of these squared maximum loads across all databases.
352+
*
353+
* @param currentAssignment an array where each element is the nodeId assigned for the
354+
* corresponding region in {@code regionKeys}.
355+
* @param regionKeys a list of region identifiers (TConsensusGroupId) representing the regions
356+
* under migration.
357+
* @param regionDatabaseMap a mapping from each region identifier to its corresponding database
358+
* name.
359+
* @return the sum of the squares of the maximum loads computed for each database.
360+
*/
361+
private int computeDatabaseLoadSquaredSum(
362+
int[] currentAssignment,
363+
List<TConsensusGroupId> regionKeys,
364+
Map<TConsensusGroupId, String> regionDatabaseMap) {
365+
Map<String, int[]> extraLoadPerDb = new HashMap<>();
366+
// Initialize extra load counters for each database using the number of nodes from
367+
// regionCounter.
368+
for (String db : initialDbLoad.keySet()) {
369+
extraLoadPerDb.put(db, new int[regionCounter.length]);
370+
}
371+
// Accumulate extra load per database based on the current assignment.
372+
for (int i = 0; i < regionKeys.size(); i++) {
373+
TConsensusGroupId regionId = regionKeys.get(i);
374+
String db = regionDatabaseMap.get(regionId);
375+
int nodeId = currentAssignment[i];
376+
extraLoadPerDb.get(db)[nodeId]++;
377+
}
378+
int sumSquared = 0;
379+
// For each database, compute the maximum load across nodes and add its square to the sum.
380+
for (String db : initialDbLoad.keySet()) {
381+
int[] initLoads = initialDbLoad.get(db);
382+
int[] extras = extraLoadPerDb.get(db);
383+
int maxLoad = 0;
384+
for (int nodeId = 0; nodeId < regionCounter.length; nodeId++) {
385+
int load = initLoads[nodeId] + extras[nodeId];
386+
if (load > maxLoad) {
387+
maxLoad = load;
388+
}
345389
}
390+
sumSquared += maxLoad * maxLoad;
391+
}
392+
return sumSquared;
393+
}
394+
395+
/**
396+
* Computes the current migration metrics.
397+
*
398+
* <p>This method calculates three key metrics:
399+
*
400+
* <ol>
401+
* <li><strong>Max Global Load:</strong> The maximum load among all nodes, computed as the sum
402+
* of the initial region load (from {@code regionCounter}) and the additional load (from
403+
* {@code additionalLoad}).
404+
* <li><strong>Database Load Squared Sum:</strong> The squared sum of the maximum load per
405+
* database, which is computed by {@link #computeDatabaseLoadSquaredSum(int[], List, Map)}.
406+
* <li><strong>Scatter Value:</strong> A provided metric that reflects additional balancing
407+
* criteria.
408+
* </ol>
409+
*
410+
* The metrics are returned as an array of three integers in the order: [maxGlobalLoad,
411+
* databaseLoadSquaredSum, scatterValue].
412+
*
413+
* @param additionalLoad an array representing the additional load assigned to each node during
414+
* migration.
415+
* @param currentScatter the current scatter value metric.
416+
* @param regionKeys a list of region identifiers (TConsensusGroupId) for which migration is being
417+
* computed.
418+
* @param regionDatabaseMap a mapping from each region identifier to its corresponding database
419+
* name.
420+
* @param currentAssignment an array where each element is the nodeId assigned for the
421+
* corresponding region in {@code regionKeys}.
422+
* @return an integer array of size 3: [maxGlobalLoad, databaseLoadSquaredSum, scatterValue].
423+
*/
424+
private int[] getCurrentMetrics(
425+
int[] additionalLoad,
426+
int currentScatter,
427+
List<TConsensusGroupId> regionKeys,
428+
Map<TConsensusGroupId, String> regionDatabaseMap,
429+
int[] currentAssignment) {
430+
int currentMaxGlobalLoad = 0;
431+
// Calculate the maximum global load across all data nodes.
432+
for (int nodeId = 0; nodeId < additionalLoad.length; nodeId++) {
433+
int globalLoad = regionCounter[nodeId] + additionalLoad[nodeId];
434+
currentMaxGlobalLoad = Math.max(currentMaxGlobalLoad, globalLoad);
346435
}
436+
// Compute the database load squared sum using the helper method.
437+
int dbLoadSquaredSum =
438+
computeDatabaseLoadSquaredSum(currentAssignment, regionKeys, regionDatabaseMap);
347439
// Build current metrics in order [maxGlobalLoad, maxDatabaseLoad, scatterValue]
348-
return new int[] {currentMaxLoad, currentMaxDBLoad, currentScatter};
440+
return new int[] {currentMaxGlobalLoad, dbLoadSquaredSum, currentScatter};
441+
}
442+
443+
/**
444+
* Compute the initial load for each database on each data node.
445+
*
446+
* @param databaseAllocatedRegionGroupMap Mapping of each database to its list of replica sets.
447+
*/
448+
private void computeInitialDbLoad(
449+
Map<String, List<TRegionReplicaSet>> databaseAllocatedRegionGroupMap) {
450+
initialDbLoad = new HashMap<>();
451+
452+
// Iterate over each database and count the number of regions on each data node across all its
453+
// replica sets.
454+
for (String database : databaseAllocatedRegionGroupMap.keySet()) {
455+
List<TRegionReplicaSet> replicaSets = databaseAllocatedRegionGroupMap.get(database);
456+
int[] load = new int[regionCounter.length];
457+
for (TRegionReplicaSet replicaSet : replicaSets) {
458+
for (TDataNodeLocation location : replicaSet.getDataNodeLocations()) {
459+
int nodeId = location.getDataNodeId();
460+
load[nodeId]++;
461+
}
462+
}
463+
initialDbLoad.put(database, load);
464+
}
349465
}
350466

351467
/**

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/IRegionGroupAllocator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ TRegionReplicaSet generateOptimalRegionReplicasDistribution(
5454
* @param availableDataNodeMap DataNodes that can be used for allocation
5555
* @param freeDiskSpaceMap The free disk space of the DataNodes
5656
* @param allocatedRegionGroups Allocated RegionGroups
57-
* @param regionDatabaseMap
57+
* @param regionDatabaseMap A mapping from each region identifier to its corresponding database
58+
* name
5859
* @param databaseAllocatedRegionGroupMap Allocated RegionGroups within the same Database with the
5960
* replica set
6061
* @param remainReplicasMap the remaining replica set excluding the removed DataNodes

iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRemoveNodeReplicaSelectTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,13 +147,16 @@ public void testSelectDestNode() {
147147
}
148148
}
149149
Map<TConsensusGroupId, TRegionReplicaSet> remainReplicasMap = new HashMap<>();
150-
Map<TConsensusGroupId, String> regionDatabaseMap = new HashMap<>();
151150
Map<String, List<TRegionReplicaSet>> databaseAllocatedRegionGroupMap = new HashMap<>();
152151
databaseAllocatedRegionGroupMap.put("database", allocateResult);
153152

154153
for (TRegionReplicaSet remainReplicaSet : remainReplicas) {
155154
remainReplicasMap.put(remainReplicaSet.getRegionId(), remainReplicaSet);
156155
}
156+
Map<TConsensusGroupId, String> regionDatabaseMap = new HashMap<>();
157+
for (TRegionReplicaSet replicaSet : allocateResult) {
158+
regionDatabaseMap.put(replicaSet.getRegionId(), "database");
159+
}
157160
Map<TConsensusGroupId, TDataNodeConfiguration> result =
158161
GCR_ALLOCATOR.removeNodeReplicaSelect(
159162
AVAILABLE_DATA_NODE_MAP,

0 commit comments

Comments
 (0)