Skip to content

Commit

Permalink
Add num not adhering placement ledgers replicated metric for Replicat…
Browse files Browse the repository at this point in the history
…ionWorker (apache#3652)

### Motivation

We have `NUM_FULL_OR_PARTIAL_LEDGERS_REPLICATED` in `ReplicationWorker`, which includes both `DATA_LOSS` and `DATA_NOT_ADHERING_PLACEMENT` data repair types. It is meaningful to add `NUM_NOT_ADHERING_PLACEMENT_LEDGERS_REPLICATED` metric for `DATA_NOT_ADHERING_PLACEMENT` type separately.
  • Loading branch information
wenbingshen authored Mar 7, 2023
1 parent 73c5a0e commit 08c3138
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,5 @@ public interface ReplicationStats {
String NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION = "NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION";
String NUM_UNDER_REPLICATED_LEDGERS_GUAGE = "NUM_UNDER_REPLICATED_LEDGERS_GUAGE";
String NUM_REPLICATED_LEDGERS = "NUM_REPLICATED_LEDGERS";
String NUM_NOT_ADHERING_PLACEMENT_LEDGERS_REPLICATED = "NUM_NOT_ADHERING_PLACEMENT_LEDGERS_REPLICATED";
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER;
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION;
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_FULL_OR_PARTIAL_LEDGERS_REPLICATED;
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_NOT_ADHERING_PLACEMENT_LEDGERS_REPLICATED;
import static org.apache.bookkeeper.replication.ReplicationStats.REPLICATE_EXCEPTION;
import static org.apache.bookkeeper.replication.ReplicationStats.REPLICATION_WORKER_SCOPE;
import static org.apache.bookkeeper.replication.ReplicationStats.REREPLICATE_OP;
Expand Down Expand Up @@ -136,6 +137,11 @@ public class ReplicationWorker implements Runnable {
help = "the number of entries ReplicationWorker unable to read"
)
private final Counter numEntriesUnableToReadForReplication;
@StatsDoc(
name = NUM_NOT_ADHERING_PLACEMENT_LEDGERS_REPLICATED,
help = "the number of not adhering placement policy ledgers re-replicated"
)
private final Counter numNotAdheringPlacementLedgersReplicated;
private final Map<String, Counter> exceptionCounters;
final LoadingCache<Long, AtomicInteger> replicationFailedLedgers;
final LoadingCache<Long, ConcurrentSkipListSet<Long>> unableToReadEntriesForReplication;
Expand Down Expand Up @@ -217,6 +223,8 @@ public ConcurrentSkipListSet<Long> load(Long key) throws Exception {
.getCounter(NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER);
this.numEntriesUnableToReadForReplication = this.statsLogger
.getCounter(NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION);
this.numNotAdheringPlacementLedgersReplicated = this.statsLogger
.getCounter(NUM_NOT_ADHERING_PLACEMENT_LEDGERS_REPLICATED);
this.exceptionCounters = new HashMap<String, Counter>();
this.onReadEntryFailureCallback = (ledgerid, entryid) -> {
numEntriesUnableToReadForReplication.inc();
Expand Down Expand Up @@ -448,6 +456,7 @@ private boolean rereplicate(long ledgerIdToReplicate) throws InterruptedExceptio

boolean foundOpenFragments = false;
long numFragsReplicated = 0;
long numNotAdheringPlacementFragsReplicated = 0;
for (LedgerFragment ledgerFragment : fragments) {
if (!ledgerFragment.isClosed()) {
foundOpenFragments = true;
Expand All @@ -461,6 +470,10 @@ private boolean rereplicate(long ledgerIdToReplicate) throws InterruptedExceptio
try {
admin.replicateLedgerFragment(lh, ledgerFragment, onReadEntryFailureCallback);
numFragsReplicated++;
if (ledgerFragment.getReplicateType() == LedgerFragment
.ReplicateType.DATA_NOT_ADHERING_PLACEMENT) {
numNotAdheringPlacementFragsReplicated++;
}
} catch (BKException.BKBookieHandleNotAvailableException e) {
LOG.warn("BKBookieHandleNotAvailableException while replicating the fragment", e);
} catch (BKException.BKLedgerRecoveryException e) {
Expand All @@ -473,6 +486,9 @@ private boolean rereplicate(long ledgerIdToReplicate) throws InterruptedExceptio
if (numFragsReplicated > 0) {
numLedgersReplicated.inc();
}
if (numNotAdheringPlacementFragsReplicated > 0) {
numNotAdheringPlacementLedgersReplicated.inc();
}

if (foundOpenFragments || isLastSegmentOpenAndMissingBookies(lh)) {
deferLedgerLockRelease = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1178,6 +1178,8 @@ public void testReplicationStats() throws Exception {
statsLogger.getCounter(ReplicationStats.NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER);
final Counter numLedgersReplicatedCounter =
statsLogger.getCounter(ReplicationStats.NUM_FULL_OR_PARTIAL_LEDGERS_REPLICATED);
final Counter numNotAdheringPlacementLedgersCounter = statsLogger
.getCounter(ReplicationStats.NUM_NOT_ADHERING_PLACEMENT_LEDGERS_REPLICATED);

assertEquals("NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER",
1, numDeferLedgerLockReleaseOfFailedLedgerCounter.get().longValue());
Expand All @@ -1186,10 +1188,15 @@ public void testReplicationStats() throws Exception {
assertFalse((boolean) result);
assertEquals("NUM_FULL_OR_PARTIAL_LEDGERS_REPLICATED",
0, numLedgersReplicatedCounter.get().longValue());
assertEquals("NUM_NOT_ADHERING_PLACEMENT_LEDGERS_REPLICATED",
0, numNotAdheringPlacementLedgersCounter.get().longValue());

} else {
assertTrue((boolean) result);
assertEquals("NUM_FULL_OR_PARTIAL_LEDGERS_REPLICATED",
1, numLedgersReplicatedCounter.get().longValue());
assertEquals("NUM_NOT_ADHERING_PLACEMENT_LEDGERS_REPLICATED",
1, numNotAdheringPlacementLedgersCounter.get().longValue());
}
} catch (Exception e) {
throw new RuntimeException(e);
Expand Down

0 comments on commit 08c3138

Please sign in to comment.