Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue 9535] Add metrics for the cursor ack state #9618

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -677,4 +677,10 @@ Set<? extends Position> asyncReplayEntries(
* Get deleted batch indexes list for a batch message.
*/
long[] getDeletedBatchIndexesAsLongArray(PositionImpl position);

/**
* @return the managed cursor stats MBean
*/
ManagedCursorMXBean getStats();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger;

import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;

/**
* JMX Bean interface for ManagedCursor stats.
*/
@InterfaceAudience.LimitedPrivate
@InterfaceStability.Stable
public interface ManagedCursorMXBean {

/**
* @return the ManagedCursor name
*/
String getName();

/**
* @return the ManagedLedger name
*/
String getLedgerName();

/**
* persist cursor by ledger
* @param success
*/
void persistToLedger(boolean success);

/**
* persist cursor by zookeeper
* @param success
*/
void persistToZookeeper(boolean success);

/**
* @return the number of persist cursor by ledger that succeed
*/
long getPersistLedgerSucceed();

/**
* @return the number of persist cursor by ledger that failed
*/
long getPersistLedgerErrors();

/**
* @return the number of persist cursor by zookeeper that succeed
*/
long getPersistZookeeperSucceed();

/**
* @return the number of persist cursor by zookeeper that failed
*/
long getPersistZookeeperErrors();

}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.NoMoreEntriesToReadException;
import org.apache.bookkeeper.mledger.ManagedCursorMXBean;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
Expand Down Expand Up @@ -232,6 +233,8 @@ enum State {
AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, State.class, "state");
protected volatile State state = null;

protected final ManagedCursorMXBean mbean;

@SuppressWarnings("checkstyle:javadoctype")
public interface VoidCallback {
void operationComplete();
Expand Down Expand Up @@ -268,6 +271,7 @@ public interface VoidCallback {
// Disable mark-delete rate limiter
markDeleteLimiter = null;
}
this.mbean = new ManagedCursorMXBeanImpl(this);
}

@Override
Expand Down Expand Up @@ -2517,6 +2521,7 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin
startCreatingNewMetadataLedger();
}

mbean.persistToLedger(true);
callback.operationComplete();
} else {
log.warn("[{}] Error updating cursor {} position {} in meta-ledger {}: {}", ledger.getName(), name,
Expand All @@ -2525,6 +2530,7 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin
// in the meantime the mark-delete will be queued.
STATE_UPDATER.compareAndSet(ManagedCursorImpl.this, State.Open, State.NoLedger);

mbean.persistToLedger(false);
// Before giving up, try to persist the position in the metadata store
persistPositionMetaStore(-1, position, mdEntry.properties, new MetaStoreCallback<Void>() {
@Override
Expand All @@ -2534,13 +2540,15 @@ public void operationComplete(Void result, Stat stat) {
"[{}][{}] Updated cursor in meta store after previous failure in ledger at position {}",
ledger.getName(), name, position);
}
mbean.persistToZookeeper(true);
callback.operationComplete();
}

@Override
public void operationFailed(MetaStoreException e) {
log.warn("[{}][{}] Failed to update cursor in meta store after previous failure in ledger: {}",
ledger.getName(), name, e.getMessage());
mbean.persistToZookeeper(false);
callback.operationFailed(createManagedLedgerException(rc));
}
}, true);
Expand Down Expand Up @@ -2904,6 +2912,11 @@ public long[] getDeletedBatchIndexesAsLongArray(PositionImpl position) {
}
}

@Override
public ManagedCursorMXBean getStats() {
return this.mbean;
}

void updateReadStats(int readEntriesCount, long readEntriesSize) {
this.entriesReadCount += readEntriesCount;
this.entriesReadSize += readEntriesSize;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.impl;

import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedCursorMXBean;

import java.util.concurrent.atomic.LongAdder;

public class ManagedCursorMXBeanImpl implements ManagedCursorMXBean {

private final LongAdder persistLedgeSucceed = new LongAdder();
private final LongAdder persistLedgeFailed = new LongAdder();

private final LongAdder persistZookeeperSucceed = new LongAdder();
private final LongAdder persistZookeeperFailed = new LongAdder();

private final ManagedCursor managedCursor;

public ManagedCursorMXBeanImpl(ManagedCursor managedCursor) {
this.managedCursor = managedCursor;
}

@Override
public String getName() {
return this.managedCursor.getName();
}

@Override
public String getLedgerName() {
return this.managedCursor.getManagedLedger().getName();
}

@Override
public void persistToLedger(boolean success) {
if (success) {
persistLedgeSucceed.increment();
} else {
persistLedgeFailed.increment();
}
}

@Override
public void persistToZookeeper(boolean success) {
if (success) {
persistZookeeperSucceed.increment();
} else {
persistZookeeperFailed.increment();
}
}

@Override
public long getPersistLedgerSucceed() {
return persistLedgeSucceed.longValue();
}

@Override
public long getPersistLedgerErrors() {
return persistLedgeFailed.longValue();
}

@Override
public long getPersistZookeeperSucceed() {
return persistZookeeperSucceed.longValue();
}

@Override
public long getPersistZookeeperErrors() {
return persistZookeeperFailed.longValue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.bookkeeper.mledger.AsyncCallbacks.SkipEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedCursorMXBean;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
Expand Down Expand Up @@ -349,6 +350,11 @@ public long[] getDeletedBatchIndexesAsLongArray(PositionImpl position) {
return new long[0];
}

@Override
public ManagedCursorMXBean getStats() {
return null;
}

public void asyncReadEntriesOrWait(int maxEntries, long maxSizeBytes, ReadEntriesCallback callback,
Object ctx) {
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.stats.metrics;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedCursorMXBean;
import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.common.stats.Metrics;

public class ManagedCursorMetrics extends AbstractMetrics {

private Map<String, String> dimensionMap;
private List<Metrics> metricsCollection;

public ManagedCursorMetrics(PulsarService pulsar) {
super(pulsar);
this.metricsCollection = Lists.newArrayList();
this.dimensionMap = Maps.newHashMap();
}

@Override
public synchronized List<Metrics> generate() {
return aggregate();
}


/**
* Aggregation by namespace, ledger, cursor.
*
* @return List<Metrics>
*/
private List<Metrics> aggregate() {
metricsCollection.clear();
for (Map.Entry<String, ManagedLedgerImpl> e : getManagedLedgers().entrySet()) {
String ledgerName = e.getKey();
ManagedLedgerImpl ledger = e.getValue();
String namespace = parseNamespaceFromLedgerName(ledgerName);

ManagedCursorContainer cursorContainer = ledger.getCursors();
Iterator<ManagedCursor> cursorIterator = cursorContainer.iterator();

while (cursorIterator.hasNext()) {
ManagedCursorImpl cursor = (ManagedCursorImpl) cursorIterator.next();
ManagedCursorMXBean cStats = cursor.getStats();
dimensionMap.clear();
dimensionMap.put("namespace", namespace);
dimensionMap.put("ledger_name", ledgerName);
dimensionMap.put("cursor_name", cursor.getName());
Metrics metrics = createMetrics(dimensionMap);
metrics.put("brk_ml_cursor_persistLedgerSucceed", cStats.getPersistLedgerSucceed());
metrics.put("brk_ml_cursor_persistLedgerErrors", cStats.getPersistLedgerErrors());
metrics.put("brk_ml_cursor_persistZookeeperSucceed", cStats.getPersistZookeeperSucceed());
metrics.put("brk_ml_cursor_persistZookeeperErrors", cStats.getPersistZookeeperErrors());
metricsCollection.add(metrics);
}
}
return metricsCollection;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.bookkeeper.stats.StatsProvider;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics;
import org.apache.pulsar.broker.stats.metrics.ManagedLedgerCacheMetrics;
import org.apache.pulsar.broker.stats.metrics.ManagedLedgerMetrics;
import org.apache.pulsar.common.stats.Metrics;
Expand Down Expand Up @@ -129,6 +130,10 @@ private static void generateBrokerBasicMetrics(PulsarService pulsar, SimpleTextO
parseMetricsToPrometheusMetrics(new ManagedLedgerMetrics(pulsar).generate(),
clusterName, Collector.Type.GAUGE, stream);

// generate managedCursor metrics
parseMetricsToPrometheusMetrics(new ManagedCursorMetrics(pulsar).generate(),
clusterName, Collector.Type.GAUGE, stream);

// generate loadBalance metrics
parseMetricsToPrometheusMetrics(pulsar.getLoadManager().get().getLoadBalancingMetrics(),
clusterName, Collector.Type.GAUGE, stream);
Expand Down
Loading