diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java index 74b76b23b22..3580e1688d0 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java @@ -66,4 +66,5 @@ public interface ReplicationStats { String NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION = "NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION"; String NUM_UNDER_REPLICATED_LEDGERS_GUAGE = "NUM_UNDER_REPLICATED_LEDGERS_GUAGE"; String NUM_REPLICATED_LEDGERS = "NUM_REPLICATED_LEDGERS"; + String NUM_NOT_ADHERING_PLACEMENT_LEDGERS_REPLICATED = "NUM_NOT_ADHERING_PLACEMENT_LEDGERS_REPLICATED"; } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java index 0d81a89e333..942c09d4fa2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java @@ -22,6 +22,7 @@ import static org.apache.bookkeeper.replication.ReplicationStats.NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER; import static org.apache.bookkeeper.replication.ReplicationStats.NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION; import static org.apache.bookkeeper.replication.ReplicationStats.NUM_FULL_OR_PARTIAL_LEDGERS_REPLICATED; +import static org.apache.bookkeeper.replication.ReplicationStats.NUM_NOT_ADHERING_PLACEMENT_LEDGERS_REPLICATED; import static org.apache.bookkeeper.replication.ReplicationStats.REPLICATE_EXCEPTION; import static org.apache.bookkeeper.replication.ReplicationStats.REPLICATION_WORKER_SCOPE; import static org.apache.bookkeeper.replication.ReplicationStats.REREPLICATE_OP; @@ -136,6 +137,11 @@ public class ReplicationWorker implements Runnable { help = "the number of entries ReplicationWorker unable to read" ) private final Counter numEntriesUnableToReadForReplication; + @StatsDoc( + name = NUM_NOT_ADHERING_PLACEMENT_LEDGERS_REPLICATED, + help = "the number of not adhering placement policy ledgers re-replicated" + ) + private final Counter numNotAdheringPlacementLedgersReplicated; private final Map exceptionCounters; final LoadingCache replicationFailedLedgers; final LoadingCache> unableToReadEntriesForReplication; @@ -217,6 +223,8 @@ public ConcurrentSkipListSet load(Long key) throws Exception { .getCounter(NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER); this.numEntriesUnableToReadForReplication = this.statsLogger .getCounter(NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION); + this.numNotAdheringPlacementLedgersReplicated = this.statsLogger + .getCounter(NUM_NOT_ADHERING_PLACEMENT_LEDGERS_REPLICATED); this.exceptionCounters = new HashMap(); this.onReadEntryFailureCallback = (ledgerid, entryid) -> { numEntriesUnableToReadForReplication.inc(); @@ -448,6 +456,7 @@ private boolean rereplicate(long ledgerIdToReplicate) throws InterruptedExceptio boolean foundOpenFragments = false; long numFragsReplicated = 0; + long numNotAdheringPlacementFragsReplicated = 0; for (LedgerFragment ledgerFragment : fragments) { if (!ledgerFragment.isClosed()) { foundOpenFragments = true; @@ -461,6 +470,10 @@ private boolean rereplicate(long ledgerIdToReplicate) throws InterruptedExceptio try { admin.replicateLedgerFragment(lh, ledgerFragment, onReadEntryFailureCallback); numFragsReplicated++; + if (ledgerFragment.getReplicateType() == LedgerFragment + .ReplicateType.DATA_NOT_ADHERING_PLACEMENT) { + numNotAdheringPlacementFragsReplicated++; + } } catch (BKException.BKBookieHandleNotAvailableException e) { LOG.warn("BKBookieHandleNotAvailableException while replicating the fragment", e); } catch (BKException.BKLedgerRecoveryException e) { @@ -473,6 +486,9 @@ private boolean rereplicate(long ledgerIdToReplicate) throws InterruptedExceptio if (numFragsReplicated > 0) { numLedgersReplicated.inc(); } + if (numNotAdheringPlacementFragsReplicated > 0) { + numNotAdheringPlacementLedgersReplicated.inc(); + } if (foundOpenFragments || isLastSegmentOpenAndMissingBookies(lh)) { deferLedgerLockRelease = true; diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java index 88e05b995b8..b7752b8bcbf 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java @@ -1178,6 +1178,8 @@ public void testReplicationStats() throws Exception { statsLogger.getCounter(ReplicationStats.NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER); final Counter numLedgersReplicatedCounter = statsLogger.getCounter(ReplicationStats.NUM_FULL_OR_PARTIAL_LEDGERS_REPLICATED); + final Counter numNotAdheringPlacementLedgersCounter = statsLogger + .getCounter(ReplicationStats.NUM_NOT_ADHERING_PLACEMENT_LEDGERS_REPLICATED); assertEquals("NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER", 1, numDeferLedgerLockReleaseOfFailedLedgerCounter.get().longValue()); @@ -1186,10 +1188,15 @@ public void testReplicationStats() throws Exception { assertFalse((boolean) result); assertEquals("NUM_FULL_OR_PARTIAL_LEDGERS_REPLICATED", 0, numLedgersReplicatedCounter.get().longValue()); + assertEquals("NUM_NOT_ADHERING_PLACEMENT_LEDGERS_REPLICATED", + 0, numNotAdheringPlacementLedgersCounter.get().longValue()); + } else { assertTrue((boolean) result); assertEquals("NUM_FULL_OR_PARTIAL_LEDGERS_REPLICATED", 1, numLedgersReplicatedCounter.get().longValue()); + assertEquals("NUM_NOT_ADHERING_PLACEMENT_LEDGERS_REPLICATED", + 1, numNotAdheringPlacementLedgersCounter.get().longValue()); } } catch (Exception e) { throw new RuntimeException(e);