Skip to content

Commit

Permalink
Warn if consensus client doesn't call transition configuration endpoi…
Browse files Browse the repository at this point in the history
…nt within 120 seconds (hyperledger#3569)

* add qos timer to engine_exchangeTransitionConfiguration

Signed-off-by: garyschulte <garyschulte@gmail.com>
Co-authored-by: Justin Florentine <justin+github@florentine.us>
  • Loading branch information
garyschulte and jflo committed May 2, 2022
1 parent 45b7ed2 commit adb09e5
Show file tree
Hide file tree
Showing 6 changed files with 294 additions and 18 deletions.
19 changes: 9 additions & 10 deletions besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ public Runner build() {
Optional<JsonRpcHttpService> jsonRpcHttpService = Optional.empty();
Optional<JsonRpcHttpService> engineJsonRpcHttpService = Optional.empty();
if (jsonRpcConfiguration.isEnabled()) {
final Map<String, JsonRpcMethod> allJsonRpcMethods =
final Map<String, JsonRpcMethod> nonEngineMethods =
jsonRpcMethods(
protocolSchedule,
context,
Expand All @@ -592,7 +592,9 @@ public Runner build() {
miningCoordinator,
metricsSystem,
supportedCapabilities,
jsonRpcConfiguration.getRpcApis(),
jsonRpcConfiguration.getRpcApis().stream()
.filter(apiGroup -> !apiGroup.toLowerCase().startsWith("engine"))
.collect(Collectors.toList()),
filterManager,
accountLocalConfigPermissioningController,
nodeLocalConfigPermissioningController,
Expand All @@ -605,11 +607,6 @@ public Runner build() {
dataDir,
rpcEndpointServiceImpl);

final Map<String, JsonRpcMethod> nonEngineMethods =
allJsonRpcMethods.entrySet().stream()
.filter(entry -> !entry.getKey().toLowerCase().startsWith("engine"))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

jsonRpcHttpService =
Optional.of(
new JsonRpcHttpService(
Expand Down Expand Up @@ -704,7 +701,7 @@ public Runner build() {
Optional<WebSocketService> webSocketService = Optional.empty();
Optional<WebSocketService> engineWebSocketService = Optional.empty();
if (webSocketConfiguration.isEnabled()) {
final Map<String, JsonRpcMethod> webSocketsJsonRpcMethods =
final Map<String, JsonRpcMethod> nonEngineMethods =
jsonRpcMethods(
protocolSchedule,
context,
Expand All @@ -716,7 +713,9 @@ public Runner build() {
miningCoordinator,
metricsSystem,
supportedCapabilities,
webSocketConfiguration.getRpcApis(),
webSocketConfiguration.getRpcApis().stream()
.filter(apiGroup -> !apiGroup.toLowerCase().startsWith("engine"))
.collect(Collectors.toList()),
filterManager,
accountLocalConfigPermissioningController,
nodeLocalConfigPermissioningController,
Expand Down Expand Up @@ -749,7 +748,7 @@ public Runner build() {
vertx,
webSocketConfiguration,
subscriptionManager,
webSocketsJsonRpcMethods,
nonEngineMethods,
privacyParameters,
protocolSchedule,
blockchainQueries,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,22 @@
*/
package org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.engine;

import static org.hyperledger.besu.ethereum.api.jsonrpc.RpcMethod.ENGINE_EXCHANGE_TRANSITION_CONFIGURATION;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.traceLambda;

import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.api.jsonrpc.RpcMethod;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.JsonRpcRequestContext;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.ExecutionEngineJsonRpcMethod;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.parameters.EngineExchangeTransitionConfigurationParameter;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcSuccessResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.results.EngineExchangeTransitionConfigurationResult;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.util.QosTimer;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;

import io.vertx.core.Vertx;
import io.vertx.core.json.Json;
Expand All @@ -38,18 +40,32 @@ public class EngineExchangeTransitionConfiguration extends ExecutionEngineJsonRp
private static final Logger LOG =
LoggerFactory.getLogger(EngineExchangeTransitionConfiguration.class);

static final long QOS_TIMEOUT_MILLIS = 120000L;

private static final AtomicReference<QosTimer> qosTimerRef =
new AtomicReference<>(
new QosTimer(
QOS_TIMEOUT_MILLIS,
lastCall ->
LOG.warn(
"not called in {} seconds, consensus client may not be connected",
QOS_TIMEOUT_MILLIS / 1000L)));

public EngineExchangeTransitionConfiguration(
final Vertx vertx, final ProtocolContext protocolContext) {
super(vertx, protocolContext);
}

@Override
public String getName() {
return RpcMethod.ENGINE_EXCHANGE_TRANSITION_CONFIGURATION.getMethodName();
return ENGINE_EXCHANGE_TRANSITION_CONFIGURATION.getMethodName();
}

@Override
public JsonRpcResponse syncResponse(final JsonRpcRequestContext requestContext) {
// update our QoS "last call time"
getQosTimer().resetTimer();

final EngineExchangeTransitionConfigurationParameter remoteTransitionConfiguration =
requestContext.getRequiredParameter(
0, EngineExchangeTransitionConfigurationParameter.class);
Expand Down Expand Up @@ -102,4 +118,9 @@ private JsonRpcResponse respondWith(
final EngineExchangeTransitionConfigurationResult transitionConfiguration) {
return new JsonRpcSuccessResponse(requestId, transitionConfiguration);
}

// QosTimer accessor for testing considerations
QosTimer getQosTimer() {
return qosTimerRef.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
package org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.engine;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import org.hyperledger.besu.consensus.merge.MergeContext;
Expand All @@ -37,29 +41,31 @@
import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.core.ParsedExtraData;
import org.hyperledger.besu.evm.log.LogsBloomFilter;
import org.hyperledger.besu.util.QosTimer;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.vertx.core.Vertx;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
@RunWith(VertxUnitRunner.class)
public class EngineExchangeTransitionConfigurationTest {
private EngineExchangeTransitionConfiguration method;
private static final Vertx vertx = Vertx.vertx();

@Mock private ProtocolContext protocolContext;
@Mock private MergeContext mergeContext;
private final ProtocolContext protocolContext = mock(ProtocolContext.class);
private final MergeContext mergeContext = mock(MergeContext.class);

@Before
public void setUp() {
Expand Down Expand Up @@ -166,6 +172,99 @@ public void shouldAlwaysReturnResultsInHex() throws JsonProcessingException {
.isEqualTo("0x0000000000000000000000000000000000000000000000000000000000000000");
}

@Test
public void shouldWarnWhenExchangeConfigNotCalledWithinTimeout(final TestContext ctx) {
final long TEST_QOS_TIMEOUT = 75L;
final Async async = ctx.async();
final AtomicInteger logCounter = new AtomicInteger(0);
final var spyMethod = spy(method);
final var spyTimer = spy(new QosTimer(TEST_QOS_TIMEOUT, z -> logCounter.incrementAndGet()));
when(spyMethod.getQosTimer()).thenReturn(spyTimer);
spyTimer.resetTimer();

vertx.setTimer(
100L,
z -> {
try {
// just once on construction:
verify(spyTimer, times(1)).resetTimer();
} catch (Exception ex) {
ctx.fail(ex);
}
// assert one warn:
ctx.assertEquals(1, logCounter.get());
async.complete();
});
}

@Test
public void shouldNotWarnWhenTimerExecutesBeforeTimeout(final TestContext ctx) {
final long TEST_QOS_TIMEOUT = 500L;
final Async async = ctx.async();
final AtomicInteger logCounter = new AtomicInteger(0);
final var spyMethod = spy(method);
final var spyTimer = spy(new QosTimer(TEST_QOS_TIMEOUT, z -> logCounter.incrementAndGet()));
when(spyMethod.getQosTimer()).thenReturn(spyTimer);
spyTimer.resetTimer();

vertx.setTimer(
50L,
z -> {
try {
// just once on construction:
verify(spyTimer, times(1)).resetTimer();
} catch (Exception ex) {
ctx.fail(ex);
}
// should not warn
ctx.assertEquals(0, logCounter.get());
async.complete();
});
}

@Test
public void shouldNotWarnWhenExchangeConfigurationCalledWithinTimeout(final TestContext ctx) {
final long TEST_QOS_TIMEOUT = 75L;
final Async async = ctx.async();
final AtomicInteger logCounter = new AtomicInteger(0);
final var spyMethod = spy(method);
final var spyTimer = spy(new QosTimer(TEST_QOS_TIMEOUT, z -> logCounter.incrementAndGet()));
when(mergeContext.getTerminalPoWBlock()).thenReturn(Optional.empty());
when(mergeContext.getTerminalTotalDifficulty()).thenReturn(Difficulty.of(1337L));
when(spyMethod.getQosTimer()).thenReturn(spyTimer);
spyTimer.resetTimer();

// call exchangeTransitionConfiguration 50 milliseconds hence to reset our QoS timer
vertx.setTimer(
50L,
z ->
spyMethod.syncResponse(
new JsonRpcRequestContext(
new JsonRpcRequest(
"2.0",
RpcMethod.ENGINE_EXCHANGE_TRANSITION_CONFIGURATION.getMethodName(),
new Object[] {
new EngineExchangeTransitionConfigurationParameter(
"24",
Hash.fromHexStringLenient("0x01").toHexString(),
new UnsignedLongParameter(0))
}))));

vertx.setTimer(
100L,
z -> {
try {
// once on construction, once on call:
verify(spyTimer, times(2)).resetTimer();
} catch (Exception ex) {
ctx.fail(ex);
}
// should not warn
ctx.assertEquals(0, logCounter.get());
async.complete();
});
}

private JsonRpcResponse resp(final EngineExchangeTransitionConfigurationParameter param) {
return method.response(
new JsonRpcRequestContext(
Expand Down
1 change: 1 addition & 0 deletions util/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ dependencies {
implementation 'org.xerial.snappy:snappy-java'

testImplementation 'junit:junit'
testImplementation 'io.vertx:vertx-unit'
testImplementation 'org.assertj:assertj-core'
testImplementation 'org.mockito:mockito-core'
}
62 changes: 62 additions & 0 deletions util/src/main/java/org/hyperledger/besu/util/QosTimer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.util;

import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

import io.vertx.core.Handler;
import io.vertx.core.Vertx;

public class QosTimer {

private final Vertx timerVertx = Vertx.vertx();
private final AtomicLong timerId = new AtomicLong(Long.MAX_VALUE);
private final AtomicLong lastReset = new AtomicLong(System.currentTimeMillis());

private final long periodMillis;
private final Consumer<Long> consumerTask;

public QosTimer(final long periodMillis, final Consumer<Long> consumerTask) {
this.periodMillis = periodMillis;
this.consumerTask = consumerTask;
resetTimer();
}

public void resetTimer() {
lastReset.set(System.currentTimeMillis());
resetTimerHandler(timerHandler());
}

void resetTimerHandler(final Handler<Long> timerHandler) {
timerVertx.cancelTimer(timerId.get());
timerId.set(timerVertx.setTimer(periodMillis, timerHandler));
}

Handler<Long> timerHandler() {
return z -> {
var lastCall = getLastCallMillis();
var now = System.currentTimeMillis();
if (lastCall + periodMillis < now) {
consumerTask.accept(lastCall);
}
resetTimerHandler(timerHandler());
};
}

long getLastCallMillis() {
return lastReset.get();
}
}
Loading

0 comments on commit adb09e5

Please sign in to comment.