Skip to content

Commit 002725e

Browse files
authored
fix region aware placement policy use disk weight not work (apache#2981)
### Motivation When we meet the following conditions: 1. configured region aware placement policy 2. enable disk weight based placement 3. fallback random selection when selecting ensemble bookies, such as: - not enough regions - rack number less than 2 in one region It will throw the following exception, and create ledger failed. ``` 12:15:36.459 [bookkeeper-ml-scheduler-OrderedScheduler-1-0] ERROR org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/test_v2] Encountered unexpected error when creating ledger java.lang.NullPointerException: null at org.apache.bookkeeper.client.WeightedRandomSelectionImpl.getNextRandom(WeightedRandomSelectionImpl.java:150) ~[io.streamnative-bookkeeper-server-4.14.3.1.jar:4.14.3.1] at org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl.selectRandomInternal(RackawareEnsemblePlacementPolicyImpl.java:748) ~[io.streamnative-bookkeeper-server-4.14.3.1.jar:4.14.3.1] at org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl.selectRandom(RackawareEnsemblePlacementPolicyImpl.java:698) ~[io.streamnative-bookkeeper-server-4.14.3.1.jar:4.14.3.1] at org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl.newEnsembleInternal(RackawareEnsemblePlacementPolicyImpl.java:409) ~[io.streamnative-bookkeeper-server-4.14.3.1.jar:4.14.3.1] at org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl.newEnsemble(RackawareEnsemblePlacementPolicyImpl.java:372) ~[io.streamnative-bookkeeper-server-4.14.3.1.jar:4.14.3.1] at org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy.newEnsemble(RackawareEnsemblePlacementPolicy.java:159) ~[io.streamnative-bookkeeper-server-4.14.3.1.jar:4.14.3.1] at org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy.newEnsemble(RegionAwareEnsemblePlacementPolicy.java:303) ~[io.streamnative-bookkeeper-server-4.14.3.1.jar:4.14.3.1] at org.apache.bookkeeper.client.BookieWatcherImpl.newEnsemble(BookieWatcherImpl.java:270) ~[io.streamnative-bookkeeper-server-4.14.3.1.jar:4.14.3.1] at org.apache.bookkeeper.client.LedgerCreateOp.initiate(LedgerCreateOp.java:161) ~[io.streamnative-bookkeeper-server-4.14.3.1.jar:4.14.3.1] at org.apache.bookkeeper.client.BookKeeper.asyncCreateLedger(BookKeeper.java:860) ~[io.streamnative-bookkeeper-server-4.14.3.1.jar:4.14.3.1] at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncCreateLedger(ManagedLedgerImpl.java:3657) ~[io.streamnative-managed-ledger-2.8.1.30.jar:2.8.1.30] at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.initializeBookKeeper(ManagedLedgerImpl.java:460) ~[io.streamnative-managed-ledger-2.8.1.30.jar:2.8.1.30] at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.access$400(ManagedLedgerImpl.java:141) ~[io.streamnative-managed-ledger-2.8.1.30.jar:2.8.1.30] at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl$1.operationComplete(ManagedLedgerImpl.java:396) ~[io.streamnative-managed-ledger-2.8.1.30.jar:2.8.1.30] at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl$1.operationComplete(ManagedLedgerImpl.java:328) ~[io.streamnative-managed-ledger-2.8.1.30.jar:2.8.1.30] at org.apache.bookkeeper.mledger.impl.MetaStoreImpl.lambda$getManagedLedgerInfo$2(MetaStoreImpl.java:97) ~[io.streamnative-managed-ledger-2.8.1.30.jar:2.8.1.30] at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714) [?:?] at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) [?:?] at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203) [io.streamnative-bookkeeper-common-4.14.3.1.jar:4.14.3.1] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?] at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.72.Final.jar:4.1.72.Final] at java.lang.Thread.run(Thread.java:834) [?:?] ``` The root cause of this case it that in `selectRandomInternal`, the `wRselection` haven't ever update any bookie map and the filed `randomMax` and `cummulativeMap` doesn't initialized. ### Modification 1. update the `wRSelection`'s map on `selectRandomInternal` method whenever the `wRSelection` have ever set or not.
1 parent 3570f83 commit 002725e

File tree

3 files changed

+48
-14
lines changed

3 files changed

+48
-14
lines changed

bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -714,20 +714,21 @@ protected List<BookieNode> selectRandomInternal(List<BookieNode> bookiesToSelect
714714
throw new BKNotEnoughBookiesException();
715715
}
716716
if (wRSelection == null) {
717-
Map<BookieNode, WeightedObject> rackMap = new HashMap<BookieNode, WeightedObject>();
718-
for (BookieNode n : bookiesToSelectFrom) {
719-
if (excludeBookies.contains(n)) {
720-
continue;
721-
}
722-
if (this.bookieInfoMap.containsKey(n)) {
723-
rackMap.put(n, this.bookieInfoMap.get(n));
724-
} else {
725-
rackMap.put(n, new BookieInfo());
726-
}
727-
}
728717
wRSelection = new WeightedRandomSelectionImpl<BookieNode>(this.maxWeightMultiple);
729-
wRSelection.updateMap(rackMap);
730718
}
719+
720+
Map<BookieNode, WeightedObject> rackMap = new HashMap<BookieNode, WeightedObject>();
721+
for (BookieNode n : bookiesToSelectFrom) {
722+
if (excludeBookies.contains(n)) {
723+
continue;
724+
}
725+
if (this.bookieInfoMap.containsKey(n)) {
726+
rackMap.put(n, this.bookieInfoMap.get(n));
727+
} else {
728+
rackMap.put(n, new BookieInfo());
729+
}
730+
}
731+
wRSelection.updateMap(rackMap);
731732
} else {
732733
Collections.shuffle(bookiesToSelectFrom);
733734
}

bookkeeper-server/src/main/java/org/apache/bookkeeper/client/WeightedRandomSelectionImpl.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,8 +150,6 @@ public T getNextRandom() {
150150
Double randomNum = randomMax * Math.random();
151151
// find the nearest key in the map corresponding to the randomNum
152152
Double key = cummulativeMap.floorKey(randomNum);
153-
//LOG.info("Random max: {} CummulativeMap size: {} selected key: {}", randomMax, cummulativeMap.size(),
154-
// key);
155153
return cummulativeMap.get(key);
156154
} finally {
157155
rwLock.readLock().unlock();

bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1524,4 +1524,39 @@ public void testNewEnsembleSetWithFiveRegions() throws Exception {
15241524
fail("Should not get not enough bookies exception even there is only one rack.");
15251525
}
15261526
}
1527+
1528+
public void testRegionsWithDiskWeight() throws Exception {
1529+
repp.uninitalize();
1530+
repp = new RegionAwareEnsemblePlacementPolicy();
1531+
conf.setProperty(REPP_ENABLE_VALIDATION, false);
1532+
conf.setDiskWeightBasedPlacementEnabled(true);
1533+
repp.initialize(conf, Optional.empty(), timer, DISABLE_ALL,
1534+
NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
1535+
BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181);
1536+
BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.3", 3181);
1537+
BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.4", 3181);
1538+
BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.5", 3181);
1539+
BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.6", 3181);
1540+
1541+
// update dns mapping
1542+
StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/region1/r1");
1543+
StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/region2/r3");
1544+
StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/region3/r11");
1545+
StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/region4/r13");
1546+
StaticDNSResolver.addNodeToRack(addr5.getHostName(), "/region5/r23");
1547+
// Update cluster
1548+
Set<BookieId> addrs = new HashSet<BookieId>();
1549+
addrs.add(addr1.toBookieId());
1550+
addrs.add(addr2.toBookieId());
1551+
addrs.add(addr3.toBookieId());
1552+
addrs.add(addr4.toBookieId());
1553+
addrs.add(addr5.toBookieId());
1554+
1555+
repp.onClusterChanged(addrs, new HashSet<BookieId>());
1556+
1557+
List<BookieId> ensemble = repp.newEnsemble(3, 3, 2, null,
1558+
new HashSet<>()).getResult();
1559+
1560+
assertEquals(3, ensemble.size());
1561+
}
15271562
}

0 commit comments

Comments
 (0)