Skip to content

Commit

Permalink
feat: throttled tx metrics (#16130)
Browse files Browse the repository at this point in the history
Signed-off-by: Kim Rader <kim.rader@swirldslabs.com>
  • Loading branch information
kimbor authored and Evdokia-Georgieva committed Oct 31, 2024
1 parent 00dea12 commit 0039895
Show file tree
Hide file tree
Showing 8 changed files with 195 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.swirlds.common.metrics.IntegerPairAccumulator;
import com.swirlds.common.metrics.RunningAverageMetric;
import com.swirlds.common.metrics.RunningAverageMetric.Config;
import com.swirlds.metrics.api.Counter;
import com.swirlds.metrics.api.IntegerAccumulator;
import com.swirlds.metrics.api.Metrics;
import edu.umd.cs.findbugs.annotations.NonNull;
Expand All @@ -45,7 +46,10 @@ public class OpWorkflowMetrics {
.withDescription("average EVM gas used per second of consensus time")
.withFormat("%,13.6f");

private final Map<HederaFunctionality, TransactionMetric> transactionMetrics =
private final Map<HederaFunctionality, TransactionMetric> transactionDurationMetrics =
new EnumMap<>(HederaFunctionality.class);

private final Map<HederaFunctionality, Counter> transactionThrottleMetrics =
new EnumMap<>(HederaFunctionality.class);

private final RunningAverageMetric gasPerConsSec;
Expand All @@ -68,6 +72,8 @@ public OpWorkflowMetrics(@NonNull final Metrics metrics, @NonNull final ConfigPr
}
final var protoName = functionality.protoName();
final var name = protoName.substring(0, 1).toLowerCase() + protoName.substring(1);

// initialize the transaction duration metrics
final var maxConfig = new IntegerAccumulator.Config("app", name + "DurationMax")
.withDescription("The maximum duration of a " + name + " transaction in nanoseconds")
.withUnit("ns");
Expand All @@ -77,15 +83,21 @@ public OpWorkflowMetrics(@NonNull final Metrics metrics, @NonNull final ConfigPr
.withDescription("The average duration of a " + name + " transaction in nanoseconds")
.withUnit("ns");
final var avgMetric = metrics.getOrCreate(avgConfig);
transactionMetrics.put(functionality, new TransactionMetric(maxMetric, avgMetric));
transactionDurationMetrics.put(functionality, new TransactionMetric(maxMetric, avgMetric));

// initialize the transaction throttle metrics
final var throttledConfig = new Counter.Config("app", name + "ThrottledTxns")
.withDescription(
"The number of " + name + " transactions that were rejected due to throttle limits");
transactionThrottleMetrics.put(functionality, metrics.getOrCreate(throttledConfig));
}

final StatsConfig statsConfig = configProvider.getConfiguration().getConfigData(StatsConfig.class);
gasPerConsSec = metrics.getOrCreate(GAS_PER_CONS_SEC_CONFIG.withHalfLife(statsConfig.runningAvgHalfLifeSecs()));
}

/**
* Update the metrics for the given functionality
* Update the transaction duration metrics for the given functionality
*
* @param functionality the {@link HederaFunctionality} for which the metrics will be updated
* @param duration the duration of the operation in {@code ns}
Expand All @@ -95,7 +107,7 @@ public void updateDuration(@NonNull final HederaFunctionality functionality, fin
if (functionality == HederaFunctionality.NONE) {
return;
}
final var metric = transactionMetrics.get(functionality);
final var metric = transactionDurationMetrics.get(functionality);
if (metric != null) {
// We do not synchronize the update of the metrics. This may lead to a situation where the max value is
// is stored in one reporting interval and the average in another. This is acceptable as synchronizing
Expand All @@ -105,6 +117,23 @@ public void updateDuration(@NonNull final HederaFunctionality functionality, fin
}
}

/**
* Increment the throttled metrics for the given functionality, to track the number of transactions per second that
* failed due to throttling
*
* @param functionality the {@link HederaFunctionality} for which the throttled metrics will be updated
*/
public void incrementThrottled(@NonNull final HederaFunctionality functionality) {
requireNonNull(functionality, "functionality must not be null");
if (functionality == HederaFunctionality.NONE) {
return;
}
final var metric = transactionThrottleMetrics.get(functionality);
if (metric != null) {
metric.increment();
}
}

public void switchConsensusSecond() {
gasPerConsSec.update(gasUsedThisConsensusSecond);
gasUsedThisConsensusSecond = 0L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.hedera.node.app.spi.authorization.Authorizer;
import com.hedera.node.app.spi.workflows.HandleException;
import com.hedera.node.app.spi.workflows.record.StreamBuilder;
import com.hedera.node.app.workflows.OpWorkflowMetrics;
import com.hedera.node.app.workflows.dispatcher.TransactionDispatcher;
import com.hedera.node.app.workflows.handle.dispatch.DispatchValidator;
import com.hedera.node.app.workflows.handle.dispatch.RecordFinalizer;
Expand Down Expand Up @@ -78,6 +79,7 @@ public class DispatchProcessor {
private final TransactionDispatcher dispatcher;
private final EthereumTransactionHandler ethereumTransactionHandler;
private final NetworkInfo networkInfo;
private final OpWorkflowMetrics workflowMetrics;

@Inject
public DispatchProcessor(
Expand All @@ -90,7 +92,8 @@ public DispatchProcessor(
@NonNull final ExchangeRateManager exchangeRateManager,
@NonNull final TransactionDispatcher dispatcher,
@NonNull final EthereumTransactionHandler ethereumTransactionHandler,
final NetworkInfo networkInfo) {
@NonNull final NetworkInfo networkInfo,
@NonNull final OpWorkflowMetrics workflowMetrics) {
this.authorizer = requireNonNull(authorizer);
this.validator = requireNonNull(validator);
this.recordFinalizer = requireNonNull(recordFinalizer);
Expand All @@ -101,6 +104,7 @@ public DispatchProcessor(
this.dispatcher = requireNonNull(dispatcher);
this.ethereumTransactionHandler = requireNonNull(ethereumTransactionHandler);
this.networkInfo = requireNonNull(networkInfo);
this.workflowMetrics = requireNonNull(workflowMetrics);
}

/**
Expand Down Expand Up @@ -140,7 +144,6 @@ public void processDispatch(@NonNull final Dispatch dispatch) {
*
* @param dispatch the dispatch to be processed
* @param validationResult the due diligence report for the dispatch
* @return the work done by the dispatch
*/
private void tryHandle(@NonNull final Dispatch dispatch, @NonNull final ValidationResult validationResult) {
try {
Expand All @@ -162,8 +165,10 @@ private void tryHandle(@NonNull final Dispatch dispatch, @NonNull final Validati
// Since there is no easy way to say how much work was done in the failed dispatch,
// and current throttling is very rough-grained, we just return USER_TRANSACTION here
} catch (final ThrottleException e) {
final var functionality = dispatch.txnInfo().functionality();
workflowMetrics.incrementThrottled(functionality);
rollbackAndRechargeFee(dispatch, validationResult, e.getStatus());
if (dispatch.txnInfo().functionality() == ETHEREUM_TRANSACTION) {
if (functionality == ETHEREUM_TRANSACTION) {
ethereumTransactionHandler.handleThrottled(dispatch.handleContext());
}
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import com.hedera.node.app.state.DeduplicationCache;
import com.hedera.node.app.store.ReadableStoreFactory;
import com.hedera.node.app.throttle.SynchronizedThrottleAccumulator;
import com.hedera.node.app.workflows.OpWorkflowMetrics;
import com.hedera.node.app.workflows.SolvencyPreCheck;
import com.hedera.node.app.workflows.TransactionChecker;
import com.hedera.node.app.workflows.TransactionChecker.RequireMinValidLifetimeBuffer;
Expand Down Expand Up @@ -97,6 +98,7 @@ public final class IngestChecker {
private final Authorizer authorizer;
private final SynchronizedThrottleAccumulator synchronizedThrottleAccumulator;
private final InstantSource instantSource;
private final OpWorkflowMetrics workflowMetrics;

/**
* Constructor of the {@code IngestChecker}
Expand All @@ -111,6 +113,7 @@ public final class IngestChecker {
* @param feeManager the {@link FeeManager} that manages {@link com.hedera.node.app.spi.fees.FeeCalculator}s
* @param synchronizedThrottleAccumulator the {@link SynchronizedThrottleAccumulator} that checks transaction should be throttled
* @param instantSource the {@link InstantSource} that provides the current time
* @param workflowMetrics the {@link OpWorkflowMetrics} that manages the metrics for all operations
* @throws NullPointerException if one of the arguments is {@code null}
*/
@Inject
Expand All @@ -126,7 +129,8 @@ public IngestChecker(
@NonNull final FeeManager feeManager,
@NonNull final Authorizer authorizer,
@NonNull final SynchronizedThrottleAccumulator synchronizedThrottleAccumulator,
@NonNull final InstantSource instantSource) {
@NonNull final InstantSource instantSource,
@NonNull final OpWorkflowMetrics workflowMetrics) {
this.nodeAccount = requireNonNull(nodeAccount, "nodeAccount must not be null");
this.currentPlatformStatus = requireNonNull(currentPlatformStatus, "currentPlatformStatus must not be null");
this.transactionChecker = requireNonNull(transactionChecker, "transactionChecker must not be null");
Expand All @@ -139,6 +143,7 @@ public IngestChecker(
this.authorizer = requireNonNull(authorizer, "authorizer must not be null");
this.synchronizedThrottleAccumulator = requireNonNull(synchronizedThrottleAccumulator);
this.instantSource = requireNonNull(instantSource);
this.workflowMetrics = requireNonNull(workflowMetrics);
}

/**
Expand Down Expand Up @@ -193,10 +198,9 @@ public TransactionInfo runAllChecks(
// 4. Check throttles
assertThrottlingPreconditions(txInfo, configuration);
final var hederaConfig = configuration.getConfigData(HederaConfig.class);
if (hederaConfig.ingestThrottleEnabled()) {
if (synchronizedThrottleAccumulator.shouldThrottle(txInfo, state)) {
throw new PreCheckException(BUSY);
}
if (hederaConfig.ingestThrottleEnabled() && synchronizedThrottleAccumulator.shouldThrottle(txInfo, state)) {
workflowMetrics.incrementThrottled(functionality);
throw new PreCheckException(BUSY);
}

// 4a. Run pure checks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ public final class QueryWorkflowImpl implements QueryWorkflow {
* @param feeManager the {@link FeeManager} to calculate the fees
* @param synchronizedThrottleAccumulator the {@link SynchronizedThrottleAccumulator} that checks transaction should be throttled
* @param instantSource the {@link InstantSource} to get the current time
* @param workflowMetrics the {@link OpWorkflowMetrics} to update the metrics
* @param shouldCharge If the workflow should charge for handling queries.
* @throws NullPointerException if one of the arguments is {@code null}
*/
Expand Down Expand Up @@ -270,6 +271,7 @@ public void handleQuery(@NonNull final Bytes requestBuffer, @NonNull final Buffe

// 5. Check query throttles
if (shouldCharge && synchronizedThrottleAccumulator.shouldThrottle(function, query, state, payerID)) {
workflowMetrics.incrementThrottled(function);
throw new PreCheckException(BUSY);
}

Expand Down
Loading

0 comments on commit 0039895

Please sign in to comment.