From 4266035224f4ea402f2c826095d3d08aeef904ce Mon Sep 17 00:00:00 2001 From: Enrico Del Fante Date: Mon, 4 Nov 2024 12:58:58 +0100 Subject: [PATCH] Decouple block and blobs publishing\import (#8728) --- .../ValidatorApiHandlerIntegrationTest.java | 170 +++++++--- .../validator/coordinator/BlockFactory.java | 3 +- .../coordinator/BlockFactoryDeneb.java | 9 +- .../coordinator/BlockFactoryPhase0.java | 4 +- .../BlockOperationSelectorFactory.java | 20 +- .../MilestoneBasedBlockFactory.java | 8 +- .../coordinator/ValidatorApiHandler.java | 23 +- .../publisher/AbstractBlockPublisher.java | 83 +++-- .../publisher/BlockPublisherDeneb.java | 45 +-- .../publisher/BlockPublisherPhase0.java | 37 ++- .../MilestoneBasedBlockPublisher.java | 17 +- .../coordinator/AbstractBlockFactoryTest.java | 3 +- .../BlockOperationSelectorFactoryTest.java | 26 +- .../coordinator/ValidatorApiHandlerTest.java | 292 +----------------- .../publisher/AbstractBlockPublisherTest.java | 191 +++++++++--- .../publisher/BlockPublisherDenebTest.java | 76 +++++ .../publisher/BlockPublisherPhase0Test.java | 72 +++++ .../trackers/BlockPublishingPerformance.java | 9 +- .../BlockPublishingPerformanceImpl.java | 9 +- .../blobs/BlobSidecarManager.java | 3 +- .../block/BlockImportChannel.java | 2 +- .../BlockBlobSidecarsTrackersPoolImpl.java | 30 +- .../time/PerformanceTracker.java | 4 +- .../teku/networking/eth2/P2PConfig.java | 20 +- .../beaconchain/BeaconChainController.java | 26 +- .../pegasys/teku/cli/options/P2POptions.java | 14 +- .../teku/cli/options/P2POptionsTest.java | 29 ++ 27 files changed, 721 insertions(+), 504 deletions(-) create mode 100644 beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/publisher/BlockPublisherDenebTest.java create mode 100644 beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/publisher/BlockPublisherPhase0Test.java diff --git a/beacon/validator/src/integrationTest/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandlerIntegrationTest.java b/beacon/validator/src/integrationTest/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandlerIntegrationTest.java index 93d2d3f72c6..85a683ef102 100644 --- a/beacon/validator/src/integrationTest/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandlerIntegrationTest.java +++ b/beacon/validator/src/integrationTest/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandlerIntegrationTest.java @@ -15,15 +15,20 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static tech.pegasys.teku.infrastructure.async.SafeFutureAssert.assertThatSafeFuture; import static tech.pegasys.teku.infrastructure.async.SafeFutureAssert.safeJoin; import static tech.pegasys.teku.infrastructure.unsigned.UInt64.ONE; +import static tech.pegasys.teku.spec.datastructures.validator.BroadcastValidationLevel.NOT_REQUIRED; +import java.util.List; import java.util.Optional; +import java.util.stream.IntStream; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestTemplate; import tech.pegasys.teku.api.ChainDataProvider; import tech.pegasys.teku.api.NetworkDataProvider; import tech.pegasys.teku.api.NodeDataProvider; @@ -31,24 +36,37 @@ import tech.pegasys.teku.beacon.sync.events.SyncStateProvider; import tech.pegasys.teku.beacon.sync.events.SyncStateTracker; import tech.pegasys.teku.ethereum.performance.trackers.BlockProductionAndPublishingPerformanceFactory; +import tech.pegasys.teku.infrastructure.async.AsyncRunner; +import tech.pegasys.teku.infrastructure.async.DelayedExecutorAsyncRunner; import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.infrastructure.metrics.StubMetricsSystem; import tech.pegasys.teku.infrastructure.metrics.Validator.ValidatorDutyMetricUtils; +import tech.pegasys.teku.infrastructure.ssz.SszList; import tech.pegasys.teku.infrastructure.time.SystemTimeProvider; import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.networking.eth2.P2PConfig; import tech.pegasys.teku.networking.eth2.gossip.BlobSidecarGossipChannel; import tech.pegasys.teku.networking.eth2.gossip.BlockGossipChannel; import tech.pegasys.teku.networking.eth2.gossip.subnets.AttestationTopicSubscriber; import tech.pegasys.teku.networking.eth2.gossip.subnets.SyncCommitteeSubscriptionManager; -import tech.pegasys.teku.spec.Spec; -import tech.pegasys.teku.spec.TestSpecFactory; +import tech.pegasys.teku.spec.SpecMilestone; +import tech.pegasys.teku.spec.SpecVersion; +import tech.pegasys.teku.spec.TestSpecContext; +import tech.pegasys.teku.spec.TestSpecInvocationContextProvider.SpecContext; +import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.Blob; +import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; import tech.pegasys.teku.spec.datastructures.blocks.SignedBlockAndState; +import tech.pegasys.teku.spec.datastructures.blocks.SignedBlockContainer; import tech.pegasys.teku.spec.datastructures.operations.AttestationData; import tech.pegasys.teku.spec.datastructures.state.Checkpoint; +import tech.pegasys.teku.spec.datastructures.type.SszKZGProof; +import tech.pegasys.teku.spec.logic.common.statetransition.results.BlockImportResult; +import tech.pegasys.teku.spec.logic.versions.deneb.helpers.MiscHelpersDeneb; import tech.pegasys.teku.statetransition.attestation.AggregatingAttestationPool; import tech.pegasys.teku.statetransition.attestation.AttestationManager; import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTrackersPool; import tech.pegasys.teku.statetransition.block.BlockImportChannel; +import tech.pegasys.teku.statetransition.block.BlockImportChannel.BlockImportAndBroadcastValidationResults; import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceTrigger; import tech.pegasys.teku.statetransition.forkchoice.ProposersDataManager; import tech.pegasys.teku.statetransition.synccommittee.SyncCommitteeContributionPool; @@ -58,16 +76,19 @@ import tech.pegasys.teku.storage.server.StateStorageMode; import tech.pegasys.teku.storage.storageSystem.InMemoryStorageSystemBuilder; import tech.pegasys.teku.storage.storageSystem.StorageSystem; +import tech.pegasys.teku.validator.api.SendSignedBlockResult; import tech.pegasys.teku.validator.coordinator.performance.DefaultPerformanceTracker; +import tech.pegasys.teku.validator.coordinator.publisher.MilestoneBasedBlockPublisher; +@TestSpecContext(milestone = {SpecMilestone.PHASE0, SpecMilestone.DENEB}) public class ValidatorApiHandlerIntegrationTest { + private final AsyncRunner asyncRunner = DelayedExecutorAsyncRunner.create(); // Use full storage system private final StorageSystem storageSystem = InMemoryStorageSystemBuilder.buildDefault(StateStorageMode.ARCHIVE); private final CombinedChainDataClient combinedChainDataClient = storageSystem.combinedChainDataClient(); - private final Spec spec = TestSpecFactory.createMinimalPhase0(); // Other dependencies are mocked, but these can be updated as needed private final SyncStateProvider syncStateProvider = mock(SyncStateTracker.class); @@ -100,45 +121,93 @@ public class ValidatorApiHandlerIntegrationTest { mock(SyncCommitteeSubscriptionManager.class); private final DutyMetrics dutyMetrics = mock(DutyMetrics.class); - private final ValidatorApiHandler handler = - new ValidatorApiHandler( - chainDataProvider, - nodeDataProvider, - networkDataProvider, - combinedChainDataClient, - syncStateProvider, - blockFactory, - blockImportChannel, - blockGossipChannel, - blockBlobSidecarsTrackersPool, - blobSidecarGossipChannel, - attestationPool, - attestationManager, - attestationTopicSubscriber, - activeValidatorTracker, - dutyMetrics, - performanceTracker, - spec, - forkChoiceTrigger, - proposersDataManager, - syncCommitteeMessagePool, - syncCommitteeContributionPool, - syncCommitteeSubscriptionManager, - new BlockProductionAndPublishingPerformanceFactory( - new SystemTimeProvider(), __ -> UInt64.ZERO, true, 0, 0, 0, 0)); + + private ValidatorApiHandler handler; @BeforeEach - public void setup() { + public void setup(final SpecContext specContext) { when(syncStateProvider.getCurrentSyncState()).thenReturn(SyncState.IN_SYNC); when(forkChoiceTrigger.prepareForAttestationProduction(any())).thenReturn(SafeFuture.COMPLETE); when(dutyMetrics.getValidatorDutyMetric()) .thenReturn(ValidatorDutyMetricUtils.createValidatorDutyMetric(new StubMetricsSystem())); + + when(blockGossipChannel.publishBlock(any())).thenReturn(SafeFuture.COMPLETE); + when(blobSidecarGossipChannel.publishBlobSidecar(any())).thenReturn(SafeFuture.COMPLETE); + when(blobSidecarGossipChannel.publishBlobSidecars(any())).thenReturn(SafeFuture.COMPLETE); + + doAnswer(invocation -> SafeFuture.completedFuture(invocation.getArgument(0))) + .when(blockFactory) + .unblindSignedBlockIfBlinded(any(), any()); + + // BlobSidecar builder + doAnswer( + invocation -> { + final SignedBlockContainer blockContainer = invocation.getArgument(0); + final SpecVersion asspecVersion = + specContext.getSpec().forMilestone(SpecMilestone.DENEB); + if (asspecVersion == null) { + return List.of(); + } + final MiscHelpersDeneb miscHelpersDeneb = + MiscHelpersDeneb.required(asspecVersion.miscHelpers()); + if (blockContainer.getBlobs().isEmpty()) { + return List.of(); + } + final SszList blobs = blockContainer.getBlobs().orElseThrow(); + final SszList proofs = blockContainer.getKzgProofs().orElseThrow(); + return IntStream.range(0, blobs.size()) + .mapToObj( + index -> + miscHelpersDeneb.constructBlobSidecar( + blockContainer.getSignedBlock(), + UInt64.valueOf(index), + blobs.get(index), + proofs.get(index))) + .toList(); + }) + .when(blockFactory) + .createBlobSidecars(any()); + + handler = + new ValidatorApiHandler( + chainDataProvider, + nodeDataProvider, + networkDataProvider, + combinedChainDataClient, + syncStateProvider, + blockFactory, + attestationPool, + attestationManager, + attestationTopicSubscriber, + activeValidatorTracker, + dutyMetrics, + performanceTracker, + specContext.getSpec(), + forkChoiceTrigger, + proposersDataManager, + syncCommitteeMessagePool, + syncCommitteeContributionPool, + syncCommitteeSubscriptionManager, + new BlockProductionAndPublishingPerformanceFactory( + new SystemTimeProvider(), __ -> UInt64.ZERO, true, 0, 0, 0, 0), + new MilestoneBasedBlockPublisher( + asyncRunner, + specContext.getSpec(), + blockFactory, + blockImportChannel, + blockGossipChannel, + blockBlobSidecarsTrackersPool, + blobSidecarGossipChannel, + performanceTracker, + dutyMetrics, + P2PConfig.DEFAULT_GOSSIP_BLOBS_AFTER_BLOCK_ENABLED)); } - @Test - public void createAttestationData_withRecentBlockAvailable() { + @TestTemplate + public void createAttestationData_withRecentBlockAvailable(final SpecContext specContext) { + specContext.assumeIsNotOneOf(SpecMilestone.DENEB); final UInt64 targetEpoch = UInt64.valueOf(3); - final UInt64 targetEpochStartSlot = spec.computeStartSlotAtEpoch(targetEpoch); + final UInt64 targetEpochStartSlot = specContext.getSpec().computeStartSlotAtEpoch(targetEpoch); final UInt64 targetSlot = targetEpochStartSlot.plus(2); final SignedBlockAndState genesis = chainUpdater.initializeGenesis(); @@ -167,12 +236,14 @@ public void createAttestationData_withRecentBlockAvailable() { assertThat(attestation.getTarget()).isEqualTo(expectedTarget); } - @Test - public void createUnsignedAttestation_withLatestBlockFromAnOldEpoch() { + @TestTemplate + public void createUnsignedAttestation_withLatestBlockFromAnOldEpoch( + final SpecContext specContext) { + specContext.assumeIsNotOneOf(SpecMilestone.DENEB); final UInt64 latestEpoch = UInt64.valueOf(2); - final UInt64 latestSlot = spec.computeStartSlotAtEpoch(latestEpoch).plus(ONE); + final UInt64 latestSlot = specContext.getSpec().computeStartSlotAtEpoch(latestEpoch).plus(ONE); final UInt64 targetEpoch = UInt64.valueOf(latestEpoch.longValue() + 3); - final UInt64 targetEpochStartSlot = spec.computeStartSlotAtEpoch(targetEpoch); + final UInt64 targetEpochStartSlot = specContext.getSpec().computeStartSlotAtEpoch(targetEpoch); final UInt64 targetSlot = targetEpochStartSlot.plus(2); final SignedBlockAndState genesis = chainUpdater.initializeGenesis(); @@ -196,4 +267,27 @@ public void createUnsignedAttestation_withLatestBlockFromAnOldEpoch() { assertThat(attestation.getSource()).isEqualTo(genesisCheckpoint); assertThat(attestation.getTarget()).isEqualTo(expectedTarget); } + + @TestTemplate + void sendSignedBlock_shouldImportAndPublishBlock(final SpecContext specContext) { + final SignedBeaconBlock block = specContext.getDataStructureUtil().randomSignedBeaconBlock(5); + + when(blockImportChannel.importBlock(block, NOT_REQUIRED)) + .thenReturn(prepareBlockImportResult(BlockImportResult.successful(block))); + final SafeFuture result = handler.sendSignedBlock(block, NOT_REQUIRED); + assertThat(result).isCompletedWithValue(SendSignedBlockResult.success(block.getRoot())); + + if (specContext.getSpecMilestone() == SpecMilestone.DENEB) { + verify(blobSidecarGossipChannel).publishBlobSidecars(any()); + } + verify(blockGossipChannel).publishBlock(block); + verify(blockImportChannel).importBlock(block, NOT_REQUIRED); + } + + private SafeFuture prepareBlockImportResult( + final BlockImportResult blockImportResult) { + return SafeFuture.completedFuture( + new BlockImportAndBroadcastValidationResults( + SafeFuture.completedFuture(blockImportResult))); + } } diff --git a/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/BlockFactory.java b/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/BlockFactory.java index ae0fa28b71f..94abffb2fb1 100644 --- a/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/BlockFactory.java +++ b/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/BlockFactory.java @@ -40,6 +40,5 @@ SafeFuture createUnsignedBlock( SafeFuture unblindSignedBlockIfBlinded( SignedBeaconBlock maybeBlindedBlock, BlockPublishingPerformance blockPublishingPerformance); - List createBlobSidecars( - SignedBlockContainer blockContainer, BlockPublishingPerformance blockPublishingPerformance); + List createBlobSidecars(SignedBlockContainer blockContainer); } diff --git a/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/BlockFactoryDeneb.java b/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/BlockFactoryDeneb.java index b0490b677a2..fbe29cbc279 100644 --- a/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/BlockFactoryDeneb.java +++ b/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/BlockFactoryDeneb.java @@ -18,7 +18,6 @@ import org.apache.tuweni.bytes.Bytes32; import tech.pegasys.teku.bls.BLSSignature; import tech.pegasys.teku.ethereum.performance.trackers.BlockProductionPerformance; -import tech.pegasys.teku.ethereum.performance.trackers.BlockPublishingPerformance; import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.spec.Spec; @@ -69,12 +68,8 @@ public SafeFuture createUnsignedBlock( } @Override - public List createBlobSidecars( - final SignedBlockContainer blockContainer, - final BlockPublishingPerformance blockPublishingPerformance) { - return operationSelector - .createBlobSidecarsSelector(blockPublishingPerformance) - .apply(blockContainer); + public List createBlobSidecars(final SignedBlockContainer blockContainer) { + return operationSelector.createBlobSidecarsSelector().apply(blockContainer); } private BlockContents createBlockContents( diff --git a/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/BlockFactoryPhase0.java b/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/BlockFactoryPhase0.java index 7b9a90d2044..397166c6e0c 100644 --- a/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/BlockFactoryPhase0.java +++ b/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/BlockFactoryPhase0.java @@ -104,9 +104,7 @@ public SafeFuture unblindSignedBlockIfBlinded( } @Override - public List createBlobSidecars( - final SignedBlockContainer blockContainer, - final BlockPublishingPerformance blockPublishingPerformance) { + public List createBlobSidecars(final SignedBlockContainer blockContainer) { return Collections.emptyList(); } } diff --git a/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/BlockOperationSelectorFactory.java b/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/BlockOperationSelectorFactory.java index 20357e8c7c3..7eff3b34f13 100644 --- a/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/BlockOperationSelectorFactory.java +++ b/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/BlockOperationSelectorFactory.java @@ -451,8 +451,7 @@ public Function> createBlobsBundleSelector( }; } - public Function> createBlobSidecarsSelector( - final BlockPublishingPerformance blockPublishingPerformance) { + public Function> createBlobSidecarsSelector() { return blockContainer -> { final UInt64 slot = blockContainer.getSlot(); final SignedBeaconBlock block = blockContainer.getSignedBlock(); @@ -505,17 +504,12 @@ public Function> createBlobSidecarsSelec final MiscHelpersDeneb miscHelpersDeneb = MiscHelpersDeneb.required(spec.atSlot(slot).miscHelpers()); - final List blobSidecars = - IntStream.range(0, blobs.size()) - .mapToObj( - index -> - miscHelpersDeneb.constructBlobSidecar( - block, UInt64.valueOf(index), blobs.get(index), proofs.get(index))) - .toList(); - - blockPublishingPerformance.blobSidecarsPrepared(); - - return blobSidecars; + return IntStream.range(0, blobs.size()) + .mapToObj( + index -> + miscHelpersDeneb.constructBlobSidecar( + block, UInt64.valueOf(index), blobs.get(index), proofs.get(index))) + .toList(); }; } diff --git a/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/MilestoneBasedBlockFactory.java b/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/MilestoneBasedBlockFactory.java index dfa2e336286..9af770aae51 100644 --- a/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/MilestoneBasedBlockFactory.java +++ b/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/MilestoneBasedBlockFactory.java @@ -93,13 +93,9 @@ public SafeFuture unblindSignedBlockIfBlinded( } @Override - public List createBlobSidecars( - final SignedBlockContainer blockContainer, - final BlockPublishingPerformance blockPublishingPerformance) { + public List createBlobSidecars(final SignedBlockContainer blockContainer) { final SpecMilestone milestone = getMilestone(blockContainer.getSlot()); - return registeredFactories - .get(milestone) - .createBlobSidecars(blockContainer, blockPublishingPerformance); + return registeredFactories.get(milestone).createBlobSidecars(blockContainer); } private SpecMilestone getMilestone(final UInt64 slot) { diff --git a/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandler.java b/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandler.java index a82a02e1fa4..46c84df772b 100644 --- a/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandler.java +++ b/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandler.java @@ -65,8 +65,6 @@ import tech.pegasys.teku.infrastructure.collections.LimitedMap; import tech.pegasys.teku.infrastructure.ssz.SszList; import tech.pegasys.teku.infrastructure.unsigned.UInt64; -import tech.pegasys.teku.networking.eth2.gossip.BlobSidecarGossipChannel; -import tech.pegasys.teku.networking.eth2.gossip.BlockGossipChannel; import tech.pegasys.teku.networking.eth2.gossip.subnets.AttestationTopicSubscriber; import tech.pegasys.teku.networking.eth2.gossip.subnets.SyncCommitteeSubscriptionManager; import tech.pegasys.teku.spec.Spec; @@ -92,8 +90,6 @@ import tech.pegasys.teku.spec.logic.common.util.SyncCommitteeUtil; import tech.pegasys.teku.statetransition.attestation.AggregatingAttestationPool; import tech.pegasys.teku.statetransition.attestation.AttestationManager; -import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTrackersPool; -import tech.pegasys.teku.statetransition.block.BlockImportChannel; import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceTrigger; import tech.pegasys.teku.statetransition.forkchoice.ProposersDataManager; import tech.pegasys.teku.statetransition.synccommittee.SyncCommitteeContributionPool; @@ -108,7 +104,6 @@ import tech.pegasys.teku.validator.coordinator.duties.AttesterDutiesGenerator; import tech.pegasys.teku.validator.coordinator.performance.PerformanceTracker; import tech.pegasys.teku.validator.coordinator.publisher.BlockPublisher; -import tech.pegasys.teku.validator.coordinator.publisher.MilestoneBasedBlockPublisher; public class ValidatorApiHandler implements ValidatorApiChannel { @@ -155,10 +150,6 @@ public ValidatorApiHandler( final CombinedChainDataClient combinedChainDataClient, final SyncStateProvider syncStateProvider, final BlockFactory blockFactory, - final BlockImportChannel blockImportChannel, - final BlockGossipChannel blockGossipChannel, - final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool, - final BlobSidecarGossipChannel blobSidecarGossipChannel, final AggregatingAttestationPool attestationPool, final AttestationManager attestationManager, final AttestationTopicSubscriber attestationTopicSubscriber, @@ -172,7 +163,8 @@ public ValidatorApiHandler( final SyncCommitteeContributionPool syncCommitteeContributionPool, final SyncCommitteeSubscriptionManager syncCommitteeSubscriptionManager, final BlockProductionAndPublishingPerformanceFactory - blockProductionAndPublishingPerformanceFactory) { + blockProductionAndPublishingPerformanceFactory, + final BlockPublisher blockPublisher) { this.blockProductionAndPublishingPerformanceFactory = blockProductionAndPublishingPerformanceFactory; this.chainDataProvider = chainDataProvider; @@ -193,16 +185,7 @@ public ValidatorApiHandler( this.syncCommitteeContributionPool = syncCommitteeContributionPool; this.syncCommitteeSubscriptionManager = syncCommitteeSubscriptionManager; this.proposersDataManager = proposersDataManager; - this.blockPublisher = - new MilestoneBasedBlockPublisher( - spec, - blockFactory, - blockImportChannel, - blockGossipChannel, - blockBlobSidecarsTrackersPool, - blobSidecarGossipChannel, - performanceTracker, - dutyMetrics); + this.blockPublisher = blockPublisher; this.attesterDutiesGenerator = new AttesterDutiesGenerator(spec); } diff --git a/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/publisher/AbstractBlockPublisher.java b/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/publisher/AbstractBlockPublisher.java index e22fef42ac0..93b39d9bf74 100644 --- a/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/publisher/AbstractBlockPublisher.java +++ b/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/publisher/AbstractBlockPublisher.java @@ -16,11 +16,15 @@ import static tech.pegasys.teku.infrastructure.logging.ValidatorLogger.VALIDATOR_LOGGER; import static tech.pegasys.teku.spec.logic.common.statetransition.results.BlockImportResult.FailureReason.FAILED_BROADCAST_VALIDATION; +import com.google.common.base.Suppliers; import java.util.List; +import java.util.function.Supplier; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import tech.pegasys.teku.ethereum.performance.trackers.BlockPublishingPerformance; +import tech.pegasys.teku.infrastructure.async.AsyncRunner; import tech.pegasys.teku.infrastructure.async.SafeFuture; +import tech.pegasys.teku.networking.eth2.gossip.BlockGossipChannel; import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; import tech.pegasys.teku.spec.datastructures.blocks.SignedBlockContainer; @@ -37,20 +41,31 @@ public abstract class AbstractBlockPublisher implements BlockPublisher { private static final Logger LOG = LogManager.getLogger(); + private final AsyncRunner asyncRunner; + + private final boolean gossipBlobsAfterBlock; + protected final BlockFactory blockFactory; protected final BlockImportChannel blockImportChannel; + protected final BlockGossipChannel blockGossipChannel; protected final PerformanceTracker performanceTracker; protected final DutyMetrics dutyMetrics; public AbstractBlockPublisher( + final AsyncRunner asyncRunner, final BlockFactory blockFactory, + final BlockGossipChannel blockGossipChannel, final BlockImportChannel blockImportChannel, final PerformanceTracker performanceTracker, - final DutyMetrics dutyMetrics) { + final DutyMetrics dutyMetrics, + final boolean gossipBlobsAfterBlock) { + this.asyncRunner = asyncRunner; this.blockFactory = blockFactory; this.blockImportChannel = blockImportChannel; + this.blockGossipChannel = blockGossipChannel; this.performanceTracker = performanceTracker; this.dutyMetrics = dutyMetrics; + this.gossipBlobsAfterBlock = gossipBlobsAfterBlock; } @Override @@ -62,30 +77,31 @@ public SafeFuture sendSignedBlock( .unblindSignedBlockIfBlinded(blockContainer.getSignedBlock(), blockPublishingPerformance) .thenPeek(performanceTracker::saveProducedBlock) .thenCompose( - signedBlock -> { - // creating blob sidecars after unblinding the block to ensure in the blinded flow we - // will have the cached builder payload - final List blobSidecars = - blockFactory.createBlobSidecars(blockContainer, blockPublishingPerformance); - return gossipAndImportUnblindedSignedBlockAndBlobSidecars( - signedBlock, blobSidecars, broadcastValidationLevel, blockPublishingPerformance); - }) + // creating blob sidecars after unblinding the block to ensure in the blinded flow we + // will have the cached builder payload + signedBlock -> + gossipAndImportUnblindedSignedBlockAndBlobSidecars( + signedBlock, + Suppliers.memoize(() -> blockFactory.createBlobSidecars(blockContainer)), + broadcastValidationLevel, + blockPublishingPerformance)) .thenCompose(result -> calculateResult(blockContainer, result, blockPublishingPerformance)); } private SafeFuture gossipAndImportUnblindedSignedBlockAndBlobSidecars( final SignedBeaconBlock block, - final List blobSidecars, + final Supplier> blobSidecars, final BroadcastValidationLevel broadcastValidationLevel, final BlockPublishingPerformance blockPublishingPerformance) { if (broadcastValidationLevel == BroadcastValidationLevel.NOT_REQUIRED) { // when broadcast validation is disabled, we can publish the block (and blob sidecars) // immediately and then import - publishBlockAndBlobSidecars(block, blobSidecars, blockPublishingPerformance); - return importBlockAndBlobSidecars( - block, blobSidecars, broadcastValidationLevel, blockPublishingPerformance); + publishBlockAndBlobs(block, blobSidecars, blockPublishingPerformance); + + importBlobSidecars(blobSidecars.get(), blockPublishingPerformance); + return importBlock(block, broadcastValidationLevel, blockPublishingPerformance); } // when broadcast validation is enabled, we need to wait for the validation to complete before @@ -93,15 +109,21 @@ public SafeFuture sendSignedBlock( final SafeFuture blockImportAndBroadcastValidationResults = - importBlockAndBlobSidecars( - block, blobSidecars, broadcastValidationLevel, blockPublishingPerformance); + importBlock(block, broadcastValidationLevel, blockPublishingPerformance); + + // prepare and import blob sidecars in parallel with block import + asyncRunner + .runAsync(() -> importBlobSidecars(blobSidecars.get(), blockPublishingPerformance)) + .finish( + error -> + LOG.error("Failed to import blob sidecars for slot {}", block.getSlot(), error)); blockImportAndBroadcastValidationResults .thenCompose(BlockImportAndBroadcastValidationResults::broadcastValidationResult) .thenAccept( broadcastValidationResult -> { if (broadcastValidationResult == BroadcastValidationResult.SUCCESS) { - publishBlockAndBlobSidecars(block, blobSidecars, blockPublishingPerformance); + publishBlockAndBlobs(block, blobSidecars, blockPublishingPerformance); LOG.debug("Block (and blob sidecars) publishing initiated"); } else { LOG.warn( @@ -120,16 +142,33 @@ public SafeFuture sendSignedBlock( return blockImportAndBroadcastValidationResults; } - abstract SafeFuture importBlockAndBlobSidecars( + private void publishBlockAndBlobs( + final SignedBeaconBlock block, + final Supplier> blobSidecars, + final BlockPublishingPerformance blockPublishingPerformance) { + + if (gossipBlobsAfterBlock) { + publishBlock(block, blockPublishingPerformance) + .always(() -> publishBlobSidecars(blobSidecars.get(), blockPublishingPerformance)); + } else { + publishBlock(block, blockPublishingPerformance).ifExceptionGetsHereRaiseABug(); + publishBlobSidecars(blobSidecars.get(), blockPublishingPerformance); + } + } + + abstract SafeFuture importBlock( SignedBeaconBlock block, - List blobSidecars, BroadcastValidationLevel broadcastValidationLevel, BlockPublishingPerformance blockPublishingPerformance); - abstract void publishBlockAndBlobSidecars( - SignedBeaconBlock block, - List blobSidecars, - BlockPublishingPerformance blockPublishingPerformance); + abstract void importBlobSidecars( + List blobSidecars, BlockPublishingPerformance blockPublishingPerformance); + + abstract SafeFuture publishBlock( + SignedBeaconBlock block, BlockPublishingPerformance blockPublishingPerformance); + + abstract void publishBlobSidecars( + List blobSidecars, BlockPublishingPerformance blockPublishingPerformance); private SafeFuture calculateResult( final SignedBlockContainer maybeBlindedBlockContainer, diff --git a/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/publisher/BlockPublisherDeneb.java b/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/publisher/BlockPublisherDeneb.java index bfe8329ed6e..d7f054ac629 100644 --- a/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/publisher/BlockPublisherDeneb.java +++ b/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/publisher/BlockPublisherDeneb.java @@ -15,57 +15,60 @@ import java.util.List; import tech.pegasys.teku.ethereum.performance.trackers.BlockPublishingPerformance; -import tech.pegasys.teku.infrastructure.async.SafeFuture; +import tech.pegasys.teku.infrastructure.async.AsyncRunner; import tech.pegasys.teku.networking.eth2.gossip.BlobSidecarGossipChannel; import tech.pegasys.teku.networking.eth2.gossip.BlockGossipChannel; import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; -import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; -import tech.pegasys.teku.spec.datastructures.validator.BroadcastValidationLevel; +import tech.pegasys.teku.statetransition.blobs.BlobSidecarManager.RemoteOrigin; import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTrackersPool; import tech.pegasys.teku.statetransition.block.BlockImportChannel; -import tech.pegasys.teku.statetransition.block.BlockImportChannel.BlockImportAndBroadcastValidationResults; import tech.pegasys.teku.validator.coordinator.BlockFactory; import tech.pegasys.teku.validator.coordinator.DutyMetrics; import tech.pegasys.teku.validator.coordinator.performance.PerformanceTracker; -public class BlockPublisherDeneb extends AbstractBlockPublisher { +public class BlockPublisherDeneb extends BlockPublisherPhase0 { - private final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool; - private final BlockGossipChannel blockGossipChannel; - private final BlobSidecarGossipChannel blobSidecarGossipChannel; + protected final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool; + protected final BlobSidecarGossipChannel blobSidecarGossipChannel; public BlockPublisherDeneb( + final AsyncRunner asyncRunner, final BlockFactory blockFactory, final BlockImportChannel blockImportChannel, final BlockGossipChannel blockGossipChannel, final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool, final BlobSidecarGossipChannel blobSidecarGossipChannel, final PerformanceTracker performanceTracker, - final DutyMetrics dutyMetrics) { - super(blockFactory, blockImportChannel, performanceTracker, dutyMetrics); + final DutyMetrics dutyMetrics, + final boolean gossipBlobsAfterBlock) { + super( + asyncRunner, + blockFactory, + blockGossipChannel, + blockImportChannel, + performanceTracker, + dutyMetrics, + gossipBlobsAfterBlock); this.blockBlobSidecarsTrackersPool = blockBlobSidecarsTrackersPool; - this.blockGossipChannel = blockGossipChannel; this.blobSidecarGossipChannel = blobSidecarGossipChannel; } @Override - SafeFuture importBlockAndBlobSidecars( - final SignedBeaconBlock block, + void importBlobSidecars( final List blobSidecars, - final BroadcastValidationLevel broadcastValidationLevel, final BlockPublishingPerformance blockPublishingPerformance) { - // provide blobs for the block before importing it - blockBlobSidecarsTrackersPool.onCompletedBlockAndBlobSidecars(block, blobSidecars); - return blockImportChannel.importBlock(block, broadcastValidationLevel); + blobSidecars.forEach( + blobSidecar -> + blockBlobSidecarsTrackersPool.onNewBlobSidecar( + blobSidecar, RemoteOrigin.LOCAL_PROPOSAL)); + blockPublishingPerformance.blobSidecarsImportCompleted(); } @Override - void publishBlockAndBlobSidecars( - final SignedBeaconBlock block, + void publishBlobSidecars( final List blobSidecars, final BlockPublishingPerformance blockPublishingPerformance) { - blockGossipChannel.publishBlock(block).ifExceptionGetsHereRaiseABug(); + blockPublishingPerformance.blobSidecarsPublishingInitiated(); blobSidecarGossipChannel.publishBlobSidecars(blobSidecars).ifExceptionGetsHereRaiseABug(); - blockPublishingPerformance.blockAndBlobSidecarsPublishingInitiated(); } } diff --git a/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/publisher/BlockPublisherPhase0.java b/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/publisher/BlockPublisherPhase0.java index 82da3f302e2..014417b21ba 100644 --- a/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/publisher/BlockPublisherPhase0.java +++ b/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/publisher/BlockPublisherPhase0.java @@ -15,6 +15,7 @@ import java.util.List; import tech.pegasys.teku.ethereum.performance.trackers.BlockPublishingPerformance; +import tech.pegasys.teku.infrastructure.async.AsyncRunner; import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.networking.eth2.gossip.BlockGossipChannel; import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; @@ -27,33 +28,51 @@ import tech.pegasys.teku.validator.coordinator.performance.PerformanceTracker; public class BlockPublisherPhase0 extends AbstractBlockPublisher { - private final BlockGossipChannel blockGossipChannel; public BlockPublisherPhase0( + final AsyncRunner asyncRunner, final BlockFactory blockFactory, final BlockGossipChannel blockGossipChannel, final BlockImportChannel blockImportChannel, final PerformanceTracker performanceTracker, - final DutyMetrics dutyMetrics) { - super(blockFactory, blockImportChannel, performanceTracker, dutyMetrics); - this.blockGossipChannel = blockGossipChannel; + final DutyMetrics dutyMetrics, + final boolean gossipBlobsAfterBlock) { + super( + asyncRunner, + blockFactory, + blockGossipChannel, + blockImportChannel, + performanceTracker, + dutyMetrics, + gossipBlobsAfterBlock); } @Override - SafeFuture importBlockAndBlobSidecars( + SafeFuture importBlock( final SignedBeaconBlock block, - final List blobSidecars, final BroadcastValidationLevel broadcastValidationLevel, final BlockPublishingPerformance blockPublishingPerformance) { return blockImportChannel.importBlock(block, broadcastValidationLevel); } @Override - void publishBlockAndBlobSidecars( - final SignedBeaconBlock block, + void importBlobSidecars( final List blobSidecars, final BlockPublishingPerformance blockPublishingPerformance) { - blockGossipChannel.publishBlock(block).ifExceptionGetsHereRaiseABug(); + // No-op for phase 0 + } + + @Override + SafeFuture publishBlock( + final SignedBeaconBlock block, final BlockPublishingPerformance blockPublishingPerformance) { blockPublishingPerformance.blockPublishingInitiated(); + return blockGossipChannel.publishBlock(block); + } + + @Override + void publishBlobSidecars( + final List blobSidecars, + final BlockPublishingPerformance blockPublishingPerformance) { + // No-op for phase 0 } } diff --git a/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/publisher/MilestoneBasedBlockPublisher.java b/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/publisher/MilestoneBasedBlockPublisher.java index d3dbaf92234..e1cc2024a53 100644 --- a/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/publisher/MilestoneBasedBlockPublisher.java +++ b/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/publisher/MilestoneBasedBlockPublisher.java @@ -18,6 +18,7 @@ import java.util.Map; import java.util.function.Supplier; import tech.pegasys.teku.ethereum.performance.trackers.BlockPublishingPerformance; +import tech.pegasys.teku.infrastructure.async.AsyncRunner; import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.networking.eth2.gossip.BlobSidecarGossipChannel; import tech.pegasys.teku.networking.eth2.gossip.BlockGossipChannel; @@ -39,6 +40,7 @@ public class MilestoneBasedBlockPublisher implements BlockPublisher { new EnumMap<>(SpecMilestone.class); public MilestoneBasedBlockPublisher( + final AsyncRunner asyncRunner, final Spec spec, final BlockFactory blockFactory, final BlockImportChannel blockImportChannel, @@ -46,24 +48,33 @@ public MilestoneBasedBlockPublisher( final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool, final BlobSidecarGossipChannel blobSidecarGossipChannel, final PerformanceTracker performanceTracker, - final DutyMetrics dutyMetrics) { + final DutyMetrics dutyMetrics, + final boolean gossipBlobsAfterBlock) { this.spec = spec; final BlockPublisherPhase0 blockPublisherPhase0 = new BlockPublisherPhase0( - blockFactory, blockGossipChannel, blockImportChannel, performanceTracker, dutyMetrics); + asyncRunner, + blockFactory, + blockGossipChannel, + blockImportChannel, + performanceTracker, + dutyMetrics, + gossipBlobsAfterBlock); // Not needed for all milestones final Supplier blockAndBlobSidecarsPublisherSupplier = Suppliers.memoize( () -> new BlockPublisherDeneb( + asyncRunner, blockFactory, blockImportChannel, blockGossipChannel, blockBlobSidecarsTrackersPool, blobSidecarGossipChannel, performanceTracker, - dutyMetrics)); + dutyMetrics, + gossipBlobsAfterBlock)); // Populate forks publishers spec.getEnabledMilestones() diff --git a/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/AbstractBlockFactoryTest.java b/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/AbstractBlockFactoryTest.java index e0197e631cd..a1c18b1f188 100644 --- a/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/AbstractBlockFactoryTest.java +++ b/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/AbstractBlockFactoryTest.java @@ -377,8 +377,7 @@ protected BlockAndBlobSidecars createBlockAndBlobSidecars( when(executionLayer.getCachedUnblindedPayload(signedBlockContainer.getSlot())) .thenReturn(builderPayload.map(BuilderPayloadOrFallbackData::create)); - final List blobSidecars = - blockFactory.createBlobSidecars(signedBlockContainer, BlockPublishingPerformance.NOOP); + final List blobSidecars = blockFactory.createBlobSidecars(signedBlockContainer); return new BlockAndBlobSidecars(signedBlockContainer, blobSidecars); } diff --git a/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/BlockOperationSelectorFactoryTest.java b/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/BlockOperationSelectorFactoryTest.java index 1370795c08e..cd18566109b 100644 --- a/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/BlockOperationSelectorFactoryTest.java +++ b/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/BlockOperationSelectorFactoryTest.java @@ -775,9 +775,7 @@ void shouldCreateBlobSidecarsForBlockContents() { MiscHelpersDeneb.required(spec.atSlot(signedBlockContents.getSlot()).miscHelpers()); final List blobSidecars = - factory - .createBlobSidecarsSelector(BlockPublishingPerformance.NOOP) - .apply(signedBlockContents); + factory.createBlobSidecarsSelector().apply(signedBlockContents); final SszList expectedBlobs = signedBlockContents.getBlobs().orElseThrow(); final SszList expectedProofs = signedBlockContents.getKzgProofs().orElseThrow(); @@ -821,11 +819,7 @@ void shouldFailCreatingBlobSidecarsIfBuilderBlobsBundleCommitmentsRootIsNotConsi dataStructureUtil.randomExecutionPayload(), blobsBundle); - assertThatThrownBy( - () -> - factory - .createBlobSidecarsSelector(BlockPublishingPerformance.NOOP) - .apply(signedBlindedBeaconBlock)) + assertThatThrownBy(() -> factory.createBlobSidecarsSelector().apply(signedBlindedBeaconBlock)) .isInstanceOf(IllegalStateException.class) .hasMessage( "Commitments in the builder BlobsBundle don't match the commitments in the block"); @@ -846,11 +840,7 @@ void shouldFailCreatingBlobSidecarsIfBuilderBlobsBundleProofsIsNotConsistent() { dataStructureUtil.randomExecutionPayload(), blobsBundle); - assertThatThrownBy( - () -> - factory - .createBlobSidecarsSelector(BlockPublishingPerformance.NOOP) - .apply(signedBlindedBeaconBlock)) + assertThatThrownBy(() -> factory.createBlobSidecarsSelector().apply(signedBlindedBeaconBlock)) .isInstanceOf(IllegalStateException.class) .hasMessage( "The number of blobs in the builder BlobsBundle doesn't match the number of commitments in the block"); @@ -871,11 +861,7 @@ void shouldFailCreatingBlobSidecarsIfBuilderBlobsBundleBlobsIsNotConsistent() { dataStructureUtil.randomExecutionPayload(), blobsBundle); - assertThatThrownBy( - () -> - factory - .createBlobSidecarsSelector(BlockPublishingPerformance.NOOP) - .apply(signedBlindedBeaconBlock)) + assertThatThrownBy(() -> factory.createBlobSidecarsSelector().apply(signedBlindedBeaconBlock)) .isInstanceOf(IllegalStateException.class) .hasMessage( "The number of proofs in the builder BlobsBundle doesn't match the number of commitments in the block"); @@ -908,9 +894,7 @@ void shouldCreateBlobSidecarsForBlindedBlock(final boolean useLocalFallback) { } final List blobSidecars = - factory - .createBlobSidecarsSelector(BlockPublishingPerformance.NOOP) - .apply(signedBlindedBeaconBlock); + factory.createBlobSidecarsSelector().apply(signedBlindedBeaconBlock); final SszList expectedBlobs = blobsBundle.getBlobs(); final SszList expectedProofs = blobsBundle.getProofs(); diff --git a/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandlerTest.java b/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandlerTest.java index ae7fd762114..346eee64205 100644 --- a/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandlerTest.java +++ b/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandlerTest.java @@ -37,7 +37,6 @@ import static tech.pegasys.teku.spec.datastructures.validator.BroadcastValidationLevel.EQUIVOCATION; import static tech.pegasys.teku.spec.datastructures.validator.BroadcastValidationLevel.GOSSIP; import static tech.pegasys.teku.spec.datastructures.validator.BroadcastValidationLevel.NOT_REQUIRED; -import static tech.pegasys.teku.spec.logic.common.statetransition.results.BlockImportResult.FailureReason.DOES_NOT_DESCEND_FROM_LATEST_FINALIZED; import it.unimi.dsi.fastutil.ints.IntList; import it.unimi.dsi.fastutil.ints.IntSet; @@ -53,7 +52,6 @@ import org.apache.tuweni.bytes.Bytes32; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.mockito.ArgumentCaptor; import org.mockito.InOrder; import tech.pegasys.teku.api.ChainDataProvider; import tech.pegasys.teku.api.NetworkDataProvider; @@ -80,10 +78,6 @@ import tech.pegasys.teku.infrastructure.ssz.SszMutableList; import tech.pegasys.teku.infrastructure.time.StubTimeProvider; import tech.pegasys.teku.infrastructure.unsigned.UInt64; -import tech.pegasys.teku.kzg.KZGCommitment; -import tech.pegasys.teku.kzg.KZGProof; -import tech.pegasys.teku.networking.eth2.gossip.BlobSidecarGossipChannel; -import tech.pegasys.teku.networking.eth2.gossip.BlockGossipChannel; import tech.pegasys.teku.networking.eth2.gossip.subnets.AttestationTopicSubscriber; import tech.pegasys.teku.networking.eth2.gossip.subnets.SyncCommitteeSubscriptionManager; import tech.pegasys.teku.spec.Spec; @@ -92,12 +86,8 @@ import tech.pegasys.teku.spec.config.SpecConfig; import tech.pegasys.teku.spec.config.SpecConfigAltair; import tech.pegasys.teku.spec.datastructures.attestation.ValidatableAttestation; -import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.Blob; -import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; import tech.pegasys.teku.spec.datastructures.blocks.SignedBlockAndState; -import tech.pegasys.teku.spec.datastructures.blocks.SignedBlockContainer; -import tech.pegasys.teku.spec.datastructures.blocks.versions.deneb.SignedBlockContents; import tech.pegasys.teku.spec.datastructures.builder.SignedValidatorRegistration; import tech.pegasys.teku.spec.datastructures.builder.ValidatorRegistration; import tech.pegasys.teku.spec.datastructures.metadata.BlockContainerAndMetaData; @@ -112,17 +102,10 @@ import tech.pegasys.teku.spec.datastructures.state.CheckpointState; import tech.pegasys.teku.spec.datastructures.state.Validator; import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState; -import tech.pegasys.teku.spec.datastructures.type.SszKZGCommitment; -import tech.pegasys.teku.spec.datastructures.type.SszKZGProof; -import tech.pegasys.teku.spec.logic.common.statetransition.results.BlockImportResult; import tech.pegasys.teku.spec.logic.common.util.SyncCommitteeUtil; -import tech.pegasys.teku.spec.logic.versions.deneb.helpers.MiscHelpersDeneb; import tech.pegasys.teku.spec.util.DataStructureUtil; import tech.pegasys.teku.statetransition.attestation.AggregatingAttestationPool; import tech.pegasys.teku.statetransition.attestation.AttestationManager; -import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTrackersPool; -import tech.pegasys.teku.statetransition.block.BlockImportChannel; -import tech.pegasys.teku.statetransition.block.BlockImportChannel.BlockImportAndBroadcastValidationResults; import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceTrigger; import tech.pegasys.teku.statetransition.forkchoice.ProposersDataManager; import tech.pegasys.teku.statetransition.synccommittee.SyncCommitteeContributionPool; @@ -135,6 +118,7 @@ import tech.pegasys.teku.validator.api.SendSignedBlockResult; import tech.pegasys.teku.validator.api.SubmitDataError; import tech.pegasys.teku.validator.coordinator.performance.DefaultPerformanceTracker; +import tech.pegasys.teku.validator.coordinator.publisher.BlockPublisher; class ValidatorApiHandlerTest { @@ -149,12 +133,7 @@ class ValidatorApiHandlerTest { private final AttestationTopicSubscriber attestationTopicSubscriptions = mock(AttestationTopicSubscriber.class); private final ActiveValidatorTracker activeValidatorTracker = mock(ActiveValidatorTracker.class); - private final BlockImportChannel blockImportChannel = mock(BlockImportChannel.class); - private final BlockGossipChannel blockGossipChannel = mock(BlockGossipChannel.class); - private final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool = - mock(BlockBlobSidecarsTrackersPool.class); - private final BlobSidecarGossipChannel blobSidecarGossipChannel = - mock(BlobSidecarGossipChannel.class); + private final BlockPublisher blockPublisher = mock(BlockPublisher.class); private final DefaultPerformanceTracker performanceTracker = mock(DefaultPerformanceTracker.class); private final ChainDataProvider chainDataProvider = mock(ChainDataProvider.class); @@ -170,14 +149,6 @@ class ValidatorApiHandlerTest { private final SyncCommitteeSubscriptionManager syncCommitteeSubscriptionManager = mock(SyncCommitteeSubscriptionManager.class); - @SuppressWarnings("unchecked") - private final ArgumentCaptor> blobSidecarsCaptor1 = - ArgumentCaptor.forClass(List.class); - - @SuppressWarnings("unchecked") - private final ArgumentCaptor> blobSidecarsCaptor2 = - ArgumentCaptor.forClass(List.class); - private final BlockProductionAndPublishingPerformanceFactory blockProductionPerformanceFactory = new BlockProductionAndPublishingPerformanceFactory( StubTimeProvider.withTimeInMillis(0), __ -> ZERO, false, 0, 0, 0, 0); @@ -195,9 +166,6 @@ public void setUp() { this.epochStartSlot = spec.computeStartSlotAtEpoch(EPOCH); this.previousEpochStartSlot = spec.computeStartSlotAtEpoch(PREVIOUS_EPOCH); this.dataStructureUtil = new DataStructureUtil(spec); - when(blockGossipChannel.publishBlock(any())).thenReturn(SafeFuture.COMPLETE); - when(blobSidecarGossipChannel.publishBlobSidecar(any())).thenReturn(SafeFuture.COMPLETE); - when(blobSidecarGossipChannel.publishBlobSidecars(any())).thenReturn(SafeFuture.COMPLETE); when(dutyMetrics.getValidatorDutyMetric()) .thenReturn(ValidatorDutyMetricUtils.createValidatorDutyMetric(new StubMetricsSystem())); this.validatorApiHandler = @@ -208,10 +176,6 @@ public void setUp() { chainDataClient, syncStateProvider, blockFactory, - blockImportChannel, - blockGossipChannel, - blockBlobSidecarsTrackersPool, - blobSidecarGossipChannel, attestationPool, attestationManager, attestationTopicSubscriptions, @@ -224,7 +188,8 @@ public void setUp() { syncCommitteeMessagePool, syncCommitteeContributionPool, syncCommitteeSubscriptionManager, - blockProductionPerformanceFactory); + blockProductionPerformanceFactory, + blockPublisher); when(syncStateProvider.getCurrentSyncState()).thenReturn(SyncState.IN_SYNC); when(forkChoiceTrigger.prepareForBlockProduction(any(), any())).thenReturn(SafeFuture.COMPLETE); @@ -463,10 +428,6 @@ void getSyncCommitteeDuties_shouldNotUseEpochPriorToFork() { chainDataClient, syncStateProvider, blockFactory, - blockImportChannel, - blockGossipChannel, - blockBlobSidecarsTrackersPool, - blobSidecarGossipChannel, attestationPool, attestationManager, attestationTopicSubscriptions, @@ -479,7 +440,8 @@ void getSyncCommitteeDuties_shouldNotUseEpochPriorToFork() { syncCommitteeMessagePool, syncCommitteeContributionPool, syncCommitteeSubscriptionManager, - blockProductionPerformanceFactory); + blockProductionPerformanceFactory, + blockPublisher); // Best state is still in Phase0 final BeaconState state = dataStructureUtil.stateBuilderPhase0().slot(previousEpochStartSlot.minus(1)).build(); @@ -905,45 +867,27 @@ private ValidatableAttestation validatableAttestationOf(final Attestation validA } @Test - public void sendSignedBlock_shouldConvertSuccessfulResult() { + public void sendSignedBlock_shouldPublish() { final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlock(5); - when(blockImportChannel.importBlock(block, NOT_REQUIRED)) - .thenReturn(prepareBlockImportResult(BlockImportResult.successful(block))); + when(blockPublisher.sendSignedBlock(eq(block), eq(NOT_REQUIRED), any())) + .thenReturn(SafeFuture.completedFuture(SendSignedBlockResult.success(block.getRoot()))); final SafeFuture result = validatorApiHandler.sendSignedBlock(block, NOT_REQUIRED); - verify(blockGossipChannel).publishBlock(block); - verify(blockImportChannel).importBlock(block, NOT_REQUIRED); assertThat(result).isCompletedWithValue(SendSignedBlockResult.success(block.getRoot())); } @Test - public void sendSignedBlock_shouldConvertFailedResult() { + public void sendSignedBlock_shouldCatchPublishFailure() { final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlock(5); - when(blockImportChannel.importBlock(block, NOT_REQUIRED)) - .thenReturn(prepareBlockImportResult(BlockImportResult.FAILED_INVALID_ANCESTRY)); + when(blockPublisher.sendSignedBlock(eq(block), eq(NOT_REQUIRED), any())) + .thenReturn(SafeFuture.failedFuture(new RuntimeException("Failed to publish block"))); final SafeFuture result = validatorApiHandler.sendSignedBlock(block, NOT_REQUIRED); - verify(blockGossipChannel).publishBlock(block); - verify(blockImportChannel).importBlock(block, NOT_REQUIRED); assertThat(result) - .isCompletedWithValue( - SendSignedBlockResult.notImported(DOES_NOT_DESCEND_FROM_LATEST_FINALIZED.name())); - } - - @Test - public void sendSignedBlock_shouldConvertKnownBlockResult() { - final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlock(5); - when(blockImportChannel.importBlock(block, NOT_REQUIRED)) - .thenReturn(prepareBlockImportResult(BlockImportResult.knownBlock(block, false))); - final SafeFuture result = - validatorApiHandler.sendSignedBlock(block, NOT_REQUIRED); - - verify(blockGossipChannel).publishBlock(block); - verify(blockImportChannel).importBlock(block, NOT_REQUIRED); - assertThat(result).isCompletedWithValue(SendSignedBlockResult.success(block.getRoot())); + .isCompletedWithValue(SendSignedBlockResult.rejected("Failed to publish block")); } @Test @@ -982,8 +926,8 @@ public void sendSignedBlock_shouldConvertKnownBlockResult() { blockContainerAndMetaData.blockContainer().getBlock(), dataStructureUtil.randomSignature()); - when(blockImportChannel.importBlock(block, EQUIVOCATION)) - .thenReturn(prepareBlockImportResult(BlockImportResult.successful(block))); + when(blockPublisher.sendSignedBlock(eq(block), eq(EQUIVOCATION), any())) + .thenReturn(SafeFuture.completedFuture(SendSignedBlockResult.success(block.getRoot()))); // require GOSSIP validation final SafeFuture result = @@ -992,105 +936,7 @@ public void sendSignedBlock_shouldConvertKnownBlockResult() { assertThat(result).isCompletedWithValue(SendSignedBlockResult.success(block.getRoot())); // for locally created blocks, the validation level should have been changed to EQUIVOCATION - verify(blockImportChannel).importBlock(block, EQUIVOCATION); - } - - @Test - public void sendSignedBlock_shouldConvertBlockContentsSuccessfulResult() { - setupDeneb(); - final SignedBlockContents blockContents = - dataStructureUtil.randomSignedBlockContents(UInt64.valueOf(5)); - final SignedBeaconBlock block = blockContents.getSignedBlock(); - final List expectedBlobSidecars = - BlobSidecarSummary.fromSignedBlockContents(blockContents); - - when(blockImportChannel.importBlock(block, NOT_REQUIRED)) - .thenReturn(prepareBlockImportResult(BlockImportResult.successful(block))); - final SafeFuture result = - validatorApiHandler.sendSignedBlock(blockContents, NOT_REQUIRED); - - verify(blobSidecarGossipChannel).publishBlobSidecars(blobSidecarsCaptor1.capture()); - assertThat(BlobSidecarSummary.fromBlobSidecars(blobSidecarsCaptor1.getValue())) - .isEqualTo(expectedBlobSidecars); - verify(blockBlobSidecarsTrackersPool) - .onCompletedBlockAndBlobSidecars(eq(block), blobSidecarsCaptor2.capture()); - assertThat(BlobSidecarSummary.fromBlobSidecars(blobSidecarsCaptor2.getValue())) - .isEqualTo(expectedBlobSidecars); - verify(blockGossipChannel).publishBlock(block); - verify(blockImportChannel).importBlock(block, NOT_REQUIRED); - assertThat(result).isCompletedWithValue(SendSignedBlockResult.success(block.getRoot())); - } - - @Test - public void sendSignedBlock_shouldConvertBlockContentsFailedResult() { - setupDeneb(); - final SignedBlockContents blockContents = - dataStructureUtil.randomSignedBlockContents(UInt64.valueOf(5)); - final SignedBeaconBlock block = blockContents.getSignedBlock(); - final List expectedBlobSidecars = - BlobSidecarSummary.fromSignedBlockContents(blockContents); - - when(blockImportChannel.importBlock(block, NOT_REQUIRED)) - .thenReturn(prepareBlockImportResult(BlockImportResult.FAILED_INVALID_ANCESTRY)); - final SafeFuture result = - validatorApiHandler.sendSignedBlock(blockContents, NOT_REQUIRED); - - verify(blobSidecarGossipChannel).publishBlobSidecars(blobSidecarsCaptor1.capture()); - assertThat(BlobSidecarSummary.fromBlobSidecars(blobSidecarsCaptor1.getValue())) - .isEqualTo(expectedBlobSidecars); - verify(blockBlobSidecarsTrackersPool) - .onCompletedBlockAndBlobSidecars(eq(block), blobSidecarsCaptor2.capture()); - assertThat(BlobSidecarSummary.fromBlobSidecars(blobSidecarsCaptor2.getValue())) - .isEqualTo(expectedBlobSidecars); - verify(blockGossipChannel).publishBlock(block); - verify(blockImportChannel).importBlock(block, NOT_REQUIRED); - assertThat(result) - .isCompletedWithValue( - SendSignedBlockResult.notImported(DOES_NOT_DESCEND_FROM_LATEST_FINALIZED.name())); - } - - @Test - public void sendSignedBlockForDeneb_shouldConvertBlockContentsKnownBlockResult() { - setupDeneb(); - final SignedBlockContents blockContents = - dataStructureUtil.randomSignedBlockContents(UInt64.valueOf(5)); - final SignedBeaconBlock block = blockContents.getSignedBlock(); - final List expectedBlobSidecars = - BlobSidecarSummary.fromSignedBlockContents(blockContents); - - when(blockImportChannel.importBlock(block, NOT_REQUIRED)) - .thenReturn(prepareBlockImportResult(BlockImportResult.knownBlock(block, false))); - final SafeFuture result = - validatorApiHandler.sendSignedBlock(blockContents, NOT_REQUIRED); - - verify(blobSidecarGossipChannel).publishBlobSidecars(blobSidecarsCaptor1.capture()); - assertThat(BlobSidecarSummary.fromBlobSidecars(blobSidecarsCaptor1.getValue())) - .isEqualTo(expectedBlobSidecars); - verify(blockBlobSidecarsTrackersPool) - .onCompletedBlockAndBlobSidecars(eq(block), blobSidecarsCaptor2.capture()); - assertThat(BlobSidecarSummary.fromBlobSidecars(blobSidecarsCaptor2.getValue())) - .isEqualTo(expectedBlobSidecars); - verify(blockGossipChannel).publishBlock(block); - verify(blockImportChannel).importBlock(block, NOT_REQUIRED); - assertThat(result).isCompletedWithValue(SendSignedBlockResult.success(block.getRoot())); - } - - @Test - public void sendSignedBlock_shouldGossipAndImportEmptyBlobSidecarsWhenBlobsDoNotExist() { - setupDeneb(); - final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlock(5); - - when(blockImportChannel.importBlock(block, NOT_REQUIRED)) - .thenReturn(prepareBlockImportResult(BlockImportResult.successful(block))); - final SafeFuture result = - validatorApiHandler.sendSignedBlock(block, NOT_REQUIRED); - safeJoin(result); - - verify(blockBlobSidecarsTrackersPool).onCompletedBlockAndBlobSidecars(block, List.of()); - verify(blobSidecarGossipChannel).publishBlobSidecars(List.of()); - verify(blockGossipChannel).publishBlock(block); - verify(blockImportChannel).importBlock(block, NOT_REQUIRED); - assertThat(result).isCompletedWithValue(SendSignedBlockResult.success(block.getRoot())); + verify(blockPublisher).sendSignedBlock(eq(block), eq(EQUIVOCATION), any()); } @Test @@ -1403,110 +1249,4 @@ private void setupValidatorsState( when(chainDataProvider.getStateValidators("head", validators, new HashSet<>())) .thenReturn(SafeFuture.completedFuture(Optional.of(stateValidators))); } - - private void setupDeneb() { - this.spec = TestSpecFactory.createMinimalDeneb(); - this.epochStartSlot = spec.computeStartSlotAtEpoch(EPOCH); - this.previousEpochStartSlot = spec.computeStartSlotAtEpoch(PREVIOUS_EPOCH); - this.dataStructureUtil = new DataStructureUtil(spec); - this.validatorApiHandler = - new ValidatorApiHandler( - chainDataProvider, - nodeDataProvider, - networkDataProvider, - chainDataClient, - syncStateProvider, - blockFactory, - blockImportChannel, - blockGossipChannel, - blockBlobSidecarsTrackersPool, - blobSidecarGossipChannel, - attestationPool, - attestationManager, - attestationTopicSubscriptions, - activeValidatorTracker, - dutyMetrics, - performanceTracker, - spec, - forkChoiceTrigger, - proposersDataManager, - syncCommitteeMessagePool, - syncCommitteeContributionPool, - syncCommitteeSubscriptionManager, - blockProductionPerformanceFactory); - - // BlobSidecar builder - doAnswer( - invocation -> { - final SignedBlockContainer blockContainer = invocation.getArgument(0); - final MiscHelpersDeneb miscHelpersDeneb = - MiscHelpersDeneb.required(spec.forMilestone(SpecMilestone.DENEB).miscHelpers()); - if (blockContainer.getBlobs().isEmpty()) { - return List.of(); - } - final SszList blobs = blockContainer.getBlobs().orElseThrow(); - final SszList proofs = blockContainer.getKzgProofs().orElseThrow(); - return IntStream.range(0, blobs.size()) - .mapToObj( - index -> - miscHelpersDeneb.constructBlobSidecar( - blockContainer.getSignedBlock(), - UInt64.valueOf(index), - blobs.get(index), - proofs.get(index))) - .toList(); - }) - .when(blockFactory) - .createBlobSidecars(any(), any()); - } - - private SafeFuture prepareBlockImportResult( - final BlockImportResult blockImportResult) { - return SafeFuture.completedFuture( - new BlockImportAndBroadcastValidationResults( - SafeFuture.completedFuture(blockImportResult))); - } - - private record BlobSidecarSummary( - Blob blob, KZGProof kzgProof, KZGCommitment commitment, Bytes32 blockRoot) { - static List fromSignedBlockContents( - final SignedBlockContents signedBlockContents) { - final List blobs = signedBlockContents.getBlobs().orElseThrow().asList(); - final List proofs = - signedBlockContents.getKzgProofs().orElseThrow().stream() - .map(SszKZGProof::getKZGProof) - .toList(); - final List commitments = - signedBlockContents - .getSignedBlock() - .getMessage() - .getBody() - .getOptionalBlobKzgCommitments() - .orElseThrow() - .stream() - .map(SszKZGCommitment::getKZGCommitment) - .toList(); - return IntStream.range(0, blobs.size()) - .mapToObj( - index -> - new BlobSidecarSummary( - blobs.get(index), - proofs.get(index), - commitments.get(index), - signedBlockContents.getRoot())) - .toList(); - } - - static List fromBlobSidecars(final List blobSidecars) { - return blobSidecars.stream() - .map( - blobSidecar -> - new BlobSidecarSummary( - blobSidecar.getBlob(), - blobSidecar.getKZGProof(), - blobSidecar.getKZGCommitment(), - blobSidecar.getBlockRoot())) - .toList(); - } - } } diff --git a/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/publisher/AbstractBlockPublisherTest.java b/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/publisher/AbstractBlockPublisherTest.java index 7fd4c543dad..50391494269 100644 --- a/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/publisher/AbstractBlockPublisherTest.java +++ b/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/publisher/AbstractBlockPublisherTest.java @@ -13,6 +13,8 @@ package tech.pegasys.teku.validator.coordinator.publisher; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; @@ -24,7 +26,10 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import tech.pegasys.teku.ethereum.performance.trackers.BlockPublishingPerformance; +import tech.pegasys.teku.infrastructure.async.AsyncRunner; import tech.pegasys.teku.infrastructure.async.SafeFuture; +import tech.pegasys.teku.infrastructure.async.StubAsyncRunner; +import tech.pegasys.teku.networking.eth2.gossip.BlockGossipChannel; import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.TestSpecFactory; import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; @@ -32,6 +37,7 @@ import tech.pegasys.teku.spec.datastructures.blocks.versions.deneb.SignedBlockContents; import tech.pegasys.teku.spec.datastructures.validator.BroadcastValidationLevel; import tech.pegasys.teku.spec.logic.common.statetransition.results.BlockImportResult; +import tech.pegasys.teku.spec.logic.common.statetransition.results.BlockImportResult.FailureReason; import tech.pegasys.teku.spec.util.DataStructureUtil; import tech.pegasys.teku.statetransition.block.BlockImportChannel; import tech.pegasys.teku.statetransition.block.BlockImportChannel.BlockImportAndBroadcastValidationResults; @@ -42,9 +48,11 @@ import tech.pegasys.teku.validator.coordinator.performance.PerformanceTracker; public class AbstractBlockPublisherTest { + private final StubAsyncRunner asyncRunner = new StubAsyncRunner(); private final Spec spec = TestSpecFactory.createMinimalDeneb(); private final DataStructureUtil dataStructureUtil = new DataStructureUtil(spec); private final BlockFactory blockFactory = mock(BlockFactory.class); + private final BlockGossipChannel blockGossipChannel = mock(BlockGossipChannel.class); private final BlockImportChannel blockImportChannel = mock(BlockImportChannel.class); private final PerformanceTracker performanceTracker = mock(PerformanceTracker.class); private final DutyMetrics dutyMetrics = mock(DutyMetrics.class); @@ -52,7 +60,13 @@ public class AbstractBlockPublisherTest { private final AbstractBlockPublisher blockPublisher = spy( new BlockPublisherTest( - blockFactory, blockImportChannel, performanceTracker, dutyMetrics)); + asyncRunner, + blockFactory, + blockGossipChannel, + blockImportChannel, + performanceTracker, + dutyMetrics, + false)); final SignedBlockContents signedBlockContents = dataStructureUtil.randomSignedBlockContents(); final SignedBeaconBlock signedBlock = signedBlockContents.getSignedBlock(); @@ -60,25 +74,19 @@ public class AbstractBlockPublisherTest { @BeforeEach public void setUp() { + when(blockPublisher.publishBlock(any(), any())).thenReturn(SafeFuture.COMPLETE); when(blockFactory.unblindSignedBlockIfBlinded(signedBlock, BlockPublishingPerformance.NOOP)) .thenReturn(SafeFuture.completedFuture(signedBlock)); - when(blockFactory.createBlobSidecars(signedBlockContents, BlockPublishingPerformance.NOOP)) - .thenReturn(blobSidecars); + when(blockFactory.createBlobSidecars(signedBlockContents)).thenReturn(blobSidecars); } @Test public void sendSignedBlock_shouldPublishImmediatelyAndImportWhenBroadcastValidationIsNotRequired() { - when(blockPublisher.importBlockAndBlobSidecars( - signedBlock, - blobSidecars, - BroadcastValidationLevel.NOT_REQUIRED, - BlockPublishingPerformance.NOOP)) - .thenReturn( - SafeFuture.completedFuture( - new BlockImportAndBroadcastValidationResults( - SafeFuture.completedFuture(BlockImportResult.successful(signedBlock))))); + when(blockPublisher.importBlock( + signedBlock, BroadcastValidationLevel.NOT_REQUIRED, BlockPublishingPerformance.NOOP)) + .thenReturn(prepareBlockImportResult(BlockImportResult.successful(signedBlock))); assertThatSafeFuture( blockPublisher.sendSignedBlock( @@ -87,29 +95,23 @@ public void setUp() { BlockPublishingPerformance.NOOP)) .isCompletedWithValue(SendSignedBlockResult.success(signedBlockContents.getRoot())); + verify(blockPublisher).publishBlock(signedBlock, BlockPublishingPerformance.NOOP); + verify(blockPublisher).publishBlobSidecars(blobSidecars, BlockPublishingPerformance.NOOP); verify(blockPublisher) - .publishBlockAndBlobSidecars(signedBlock, blobSidecars, BlockPublishingPerformance.NOOP); - verify(blockPublisher) - .importBlockAndBlobSidecars( - signedBlock, - blobSidecars, - BroadcastValidationLevel.NOT_REQUIRED, - BlockPublishingPerformance.NOOP); + .importBlock( + signedBlock, BroadcastValidationLevel.NOT_REQUIRED, BlockPublishingPerformance.NOOP); + verify(blockPublisher).importBlobSidecars(blobSidecars, BlockPublishingPerformance.NOOP); } @Test public void sendSignedBlock_shouldWaitToPublishWhenBroadcastValidationIsSpecified() { final SafeFuture validationResult = new SafeFuture<>(); - when(blockPublisher.importBlockAndBlobSidecars( + when(blockPublisher.importBlock( signedBlock, - blobSidecars, BroadcastValidationLevel.CONSENSUS_AND_EQUIVOCATION, BlockPublishingPerformance.NOOP)) .thenReturn( - SafeFuture.completedFuture( - new BlockImportAndBroadcastValidationResults( - SafeFuture.completedFuture(BlockImportResult.successful(signedBlock)), - validationResult))); + prepareBlockImportResult(BlockImportResult.successful(signedBlock), validationResult)); final SafeFuture sendSignedBlockResult = blockPublisher.sendSignedBlock( @@ -118,21 +120,25 @@ public void sendSignedBlock_shouldWaitToPublishWhenBroadcastValidationIsSpecifie BlockPublishingPerformance.NOOP); assertThatSafeFuture(sendSignedBlockResult).isNotCompleted(); + assertThat(asyncRunner.hasDelayedActions()).isTrue(); + verify(blockPublisher, never()).importBlobSidecars(any(), any()); + asyncRunner.executeDueActions(); + verify(blockPublisher).importBlobSidecars(blobSidecars, BlockPublishingPerformance.NOOP); verify(blockPublisher) - .importBlockAndBlobSidecars( + .importBlock( signedBlock, - blobSidecars, BroadcastValidationLevel.CONSENSUS_AND_EQUIVOCATION, BlockPublishingPerformance.NOOP); + verify(blockPublisher, never()).publishBlock(signedBlock, BlockPublishingPerformance.NOOP); verify(blockPublisher, never()) - .publishBlockAndBlobSidecars(signedBlock, blobSidecars, BlockPublishingPerformance.NOOP); + .publishBlobSidecars(blobSidecars, BlockPublishingPerformance.NOOP); validationResult.complete(BroadcastValidationResult.SUCCESS); - verify(blockPublisher) - .publishBlockAndBlobSidecars(signedBlock, blobSidecars, BlockPublishingPerformance.NOOP); + verify(blockPublisher).publishBlock(signedBlock, BlockPublishingPerformance.NOOP); + verify(blockPublisher).publishBlobSidecars(blobSidecars, BlockPublishingPerformance.NOOP); assertThatSafeFuture(sendSignedBlockResult) .isCompletedWithValue(SendSignedBlockResult.success(signedBlockContents.getRoot())); } @@ -140,9 +146,8 @@ public void sendSignedBlock_shouldWaitToPublishWhenBroadcastValidationIsSpecifie @Test public void sendSignedBlock_shouldNotPublishWhenBroadcastValidationFails() { final SafeFuture validationResult = new SafeFuture<>(); - when(blockPublisher.importBlockAndBlobSidecars( + when(blockPublisher.importBlock( signedBlock, - blobSidecars, BroadcastValidationLevel.CONSENSUS_AND_EQUIVOCATION, BlockPublishingPerformance.NOOP)) .thenReturn( @@ -157,48 +162,152 @@ public void sendSignedBlock_shouldNotPublishWhenBroadcastValidationFails() { BroadcastValidationLevel.CONSENSUS_AND_EQUIVOCATION, BlockPublishingPerformance.NOOP); + assertThat(asyncRunner.hasDelayedActions()).isTrue(); assertThatSafeFuture(sendSignedBlockResult).isNotCompleted(); verify(blockPublisher) - .importBlockAndBlobSidecars( + .importBlock( signedBlock, - blobSidecars, BroadcastValidationLevel.CONSENSUS_AND_EQUIVOCATION, BlockPublishingPerformance.NOOP); + verify(blockPublisher, never()).publishBlock(signedBlock, BlockPublishingPerformance.NOOP); verify(blockPublisher, never()) - .publishBlockAndBlobSidecars(signedBlock, blobSidecars, BlockPublishingPerformance.NOOP); + .publishBlobSidecars(blobSidecars, BlockPublishingPerformance.NOOP); validationResult.complete(BroadcastValidationResult.CONSENSUS_FAILURE); + verify(blockPublisher, never()).publishBlock(signedBlock, BlockPublishingPerformance.NOOP); verify(blockPublisher, never()) - .publishBlockAndBlobSidecars(signedBlock, blobSidecars, BlockPublishingPerformance.NOOP); + .publishBlobSidecars(blobSidecars, BlockPublishingPerformance.NOOP); + assertThatSafeFuture(sendSignedBlockResult) .isCompletedWithValue( SendSignedBlockResult.rejected("FAILED_BROADCAST_VALIDATION: CONSENSUS_FAILURE")); } + @Test + public void sendSignedBlock_shouldPublishBlobsAfterBlockWhenOptionIsEnabled() { + final AbstractBlockPublisher blockPublisher = + spy( + new BlockPublisherTest( + asyncRunner, + blockFactory, + blockGossipChannel, + blockImportChannel, + performanceTracker, + dutyMetrics, + true)); + + SafeFuture publishBlockFuture = new SafeFuture<>(); + when(blockPublisher.publishBlock(any(), any())).thenReturn(publishBlockFuture); + + when(blockPublisher.importBlock( + signedBlock, BroadcastValidationLevel.NOT_REQUIRED, BlockPublishingPerformance.NOOP)) + .thenReturn(prepareBlockImportResult(BlockImportResult.successful(signedBlock))); + + assertThatSafeFuture( + blockPublisher.sendSignedBlock( + signedBlockContents, + BroadcastValidationLevel.NOT_REQUIRED, + BlockPublishingPerformance.NOOP)) + .isCompletedWithValue(SendSignedBlockResult.success(signedBlockContents.getRoot())); + + verify(blockPublisher).publishBlock(signedBlock, BlockPublishingPerformance.NOOP); + verify(blockPublisher, never()) + .publishBlobSidecars(blobSidecars, BlockPublishingPerformance.NOOP); + + // Complete block publishing + publishBlockFuture.complete(null); + + verify(blockPublisher).publishBlobSidecars(blobSidecars, BlockPublishingPerformance.NOOP); + + verify(blockPublisher) + .importBlock( + signedBlock, BroadcastValidationLevel.NOT_REQUIRED, BlockPublishingPerformance.NOOP); + verify(blockPublisher).importBlobSidecars(blobSidecars, BlockPublishingPerformance.NOOP); + } + + @Test + public void sendSignedBlock_shouldReturnNotImportedWhenBlockImportFails() { + when(blockPublisher.importBlock( + signedBlock, BroadcastValidationLevel.NOT_REQUIRED, BlockPublishingPerformance.NOOP)) + .thenReturn( + prepareBlockImportResult( + BlockImportResult.failedStateTransition(new RuntimeException("Failed")))); + + assertThatSafeFuture( + blockPublisher.sendSignedBlock( + signedBlockContents, + BroadcastValidationLevel.NOT_REQUIRED, + BlockPublishingPerformance.NOOP)) + .isCompletedWithValue( + SendSignedBlockResult.notImported(FailureReason.FAILED_STATE_TRANSITION.name())); + + verify(blockPublisher).publishBlock(signedBlock, BlockPublishingPerformance.NOOP); + verify(blockPublisher).publishBlobSidecars(blobSidecars, BlockPublishingPerformance.NOOP); + verify(blockPublisher) + .importBlock( + signedBlock, BroadcastValidationLevel.NOT_REQUIRED, BlockPublishingPerformance.NOOP); + verify(blockPublisher).importBlobSidecars(blobSidecars, BlockPublishingPerformance.NOOP); + } + + private SafeFuture prepareBlockImportResult( + final BlockImportResult blockImportResult) { + return SafeFuture.completedFuture( + new BlockImportAndBroadcastValidationResults( + SafeFuture.completedFuture(blockImportResult))); + } + + private SafeFuture prepareBlockImportResult( + final BlockImportResult blockImportResult, + final SafeFuture broadcastValidationResult) { + return SafeFuture.completedFuture( + new BlockImportAndBroadcastValidationResults( + SafeFuture.completedFuture(blockImportResult), broadcastValidationResult)); + } + private static class BlockPublisherTest extends AbstractBlockPublisher { public BlockPublisherTest( + final AsyncRunner asyncRunner, final BlockFactory blockFactory, + final BlockGossipChannel blockGossipChannel, final BlockImportChannel blockImportChannel, final PerformanceTracker performanceTracker, - final DutyMetrics dutyMetrics) { - super(blockFactory, blockImportChannel, performanceTracker, dutyMetrics); + final DutyMetrics dutyMetrics, + final boolean gossipBlobsAfterBlock) { + super( + asyncRunner, + blockFactory, + blockGossipChannel, + blockImportChannel, + performanceTracker, + dutyMetrics, + gossipBlobsAfterBlock); } @Override - SafeFuture importBlockAndBlobSidecars( + SafeFuture importBlock( final SignedBeaconBlock block, - final List blobSidecars, final BroadcastValidationLevel broadcastValidationLevel, final BlockPublishingPerformance blockPublishingPerformance) { return null; } @Override - void publishBlockAndBlobSidecars( + void importBlobSidecars( + final List blobSidecars, + final BlockPublishingPerformance blockPublishingPerformance) {} + + @Override + SafeFuture publishBlock( final SignedBeaconBlock block, + final BlockPublishingPerformance blockPublishingPerformance) { + return null; + } + + @Override + void publishBlobSidecars( final List blobSidecars, final BlockPublishingPerformance blockPublishingPerformance) {} } diff --git a/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/publisher/BlockPublisherDenebTest.java b/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/publisher/BlockPublisherDenebTest.java new file mode 100644 index 00000000000..b2e3d9f270c --- /dev/null +++ b/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/publisher/BlockPublisherDenebTest.java @@ -0,0 +1,76 @@ +/* + * Copyright Consensys Software Inc., 2024 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.validator.coordinator.publisher; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.List; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import tech.pegasys.teku.ethereum.performance.trackers.BlockPublishingPerformance; +import tech.pegasys.teku.infrastructure.async.AsyncRunner; +import tech.pegasys.teku.infrastructure.async.SafeFuture; +import tech.pegasys.teku.networking.eth2.gossip.BlobSidecarGossipChannel; +import tech.pegasys.teku.networking.eth2.gossip.BlockGossipChannel; +import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; +import tech.pegasys.teku.statetransition.blobs.BlobSidecarManager.RemoteOrigin; +import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTrackersPool; +import tech.pegasys.teku.statetransition.block.BlockImportChannel; +import tech.pegasys.teku.validator.coordinator.BlockFactory; +import tech.pegasys.teku.validator.coordinator.DutyMetrics; +import tech.pegasys.teku.validator.coordinator.performance.PerformanceTracker; + +class BlockPublisherDenebTest { + private final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool = + mock(BlockBlobSidecarsTrackersPool.class); + private final BlobSidecarGossipChannel blobSidecarGossipChannel = + mock(BlobSidecarGossipChannel.class); + private final BlockPublisherDeneb blockPublisherDeneb = + new BlockPublisherDeneb( + mock(AsyncRunner.class), + mock(BlockFactory.class), + mock(BlockImportChannel.class), + mock(BlockGossipChannel.class), + blockBlobSidecarsTrackersPool, + blobSidecarGossipChannel, + mock(PerformanceTracker.class), + mock(DutyMetrics.class), + true); + + private final BlobSidecar blobSidecar = mock(BlobSidecar.class); + private final List blobSidecars = List.of(blobSidecar); + + @BeforeEach + void setUp() { + when(blobSidecarGossipChannel.publishBlobSidecars(any())).thenReturn(SafeFuture.COMPLETE); + } + + @Test + void importBlobSidecars_shouldTrackBlobSidecars() { + blockPublisherDeneb.importBlobSidecars(blobSidecars, BlockPublishingPerformance.NOOP); + + verify(blockBlobSidecarsTrackersPool) + .onNewBlobSidecar(blobSidecar, RemoteOrigin.LOCAL_PROPOSAL); + } + + @Test + void publishBlobSidecars_shouldPublishBlobSidecars() { + blockPublisherDeneb.publishBlobSidecars(blobSidecars, BlockPublishingPerformance.NOOP); + + verify(blobSidecarGossipChannel).publishBlobSidecars(blobSidecars); + } +} diff --git a/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/publisher/BlockPublisherPhase0Test.java b/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/publisher/BlockPublisherPhase0Test.java new file mode 100644 index 00000000000..a5c95d61471 --- /dev/null +++ b/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/publisher/BlockPublisherPhase0Test.java @@ -0,0 +1,72 @@ +/* + * Copyright Consensys Software Inc., 2024 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.validator.coordinator.publisher; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static tech.pegasys.teku.infrastructure.async.SafeFutureAssert.safeJoin; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import tech.pegasys.teku.ethereum.performance.trackers.BlockPublishingPerformance; +import tech.pegasys.teku.infrastructure.async.AsyncRunner; +import tech.pegasys.teku.infrastructure.async.SafeFuture; +import tech.pegasys.teku.networking.eth2.gossip.BlockGossipChannel; +import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; +import tech.pegasys.teku.spec.datastructures.validator.BroadcastValidationLevel; +import tech.pegasys.teku.statetransition.block.BlockImportChannel; +import tech.pegasys.teku.validator.coordinator.BlockFactory; +import tech.pegasys.teku.validator.coordinator.DutyMetrics; +import tech.pegasys.teku.validator.coordinator.performance.PerformanceTracker; + +class BlockPublisherPhase0Test { + private final BlockGossipChannel blockGossipChannel = mock(BlockGossipChannel.class); + private final BlockImportChannel blockImportChannel = mock(BlockImportChannel.class); + + private final BlockPublisherPhase0 blockPublisherPhase0 = + new BlockPublisherPhase0( + mock(AsyncRunner.class), + mock(BlockFactory.class), + blockGossipChannel, + blockImportChannel, + mock(PerformanceTracker.class), + mock(DutyMetrics.class), + false); + + private final SignedBeaconBlock signedBlock = mock(SignedBeaconBlock.class); + + @BeforeEach + void setUp() { + when(blockGossipChannel.publishBlock(signedBlock)).thenReturn(SafeFuture.COMPLETE); + when(blockImportChannel.importBlock(signedBlock, BroadcastValidationLevel.NOT_REQUIRED)) + .thenReturn(SafeFuture.completedFuture(null)); + } + + @Test + void importBlock_shouldImportBlock() { + safeJoin( + blockPublisherPhase0.importBlock( + signedBlock, BroadcastValidationLevel.NOT_REQUIRED, BlockPublishingPerformance.NOOP)); + + verify(blockImportChannel).importBlock(signedBlock, BroadcastValidationLevel.NOT_REQUIRED); + } + + @Test + void publishBlock_shouldPublishBlock() { + safeJoin(blockPublisherPhase0.publishBlock(signedBlock, BlockPublishingPerformance.NOOP)); + + verify(blockGossipChannel).publishBlock(signedBlock); + } +} diff --git a/ethereum/performance-trackers/src/main/java/tech/pegasys/teku/ethereum/performance/trackers/BlockPublishingPerformance.java b/ethereum/performance-trackers/src/main/java/tech/pegasys/teku/ethereum/performance/trackers/BlockPublishingPerformance.java index fe9e468ec08..e55dd1f9e9a 100644 --- a/ethereum/performance-trackers/src/main/java/tech/pegasys/teku/ethereum/performance/trackers/BlockPublishingPerformance.java +++ b/ethereum/performance-trackers/src/main/java/tech/pegasys/teku/ethereum/performance/trackers/BlockPublishingPerformance.java @@ -29,16 +29,19 @@ public void builderGetPayload() {} public void blobSidecarsPrepared() {} @Override - public void blockAndBlobSidecarsPublishingInitiated() {} + public void blobSidecarsPublishingInitiated() {} @Override public void blockPublishingInitiated() {} @Override public void blockImportCompleted() {} + + @Override + public void blobSidecarsImportCompleted() {} }; - void blockAndBlobSidecarsPublishingInitiated(); + void blobSidecarsPublishingInitiated(); void blockPublishingInitiated(); @@ -46,6 +49,8 @@ public void blockImportCompleted() {} void blobSidecarsPrepared(); + void blobSidecarsImportCompleted(); + void blockImportCompleted(); void complete(); diff --git a/ethereum/performance-trackers/src/main/java/tech/pegasys/teku/ethereum/performance/trackers/BlockPublishingPerformanceImpl.java b/ethereum/performance-trackers/src/main/java/tech/pegasys/teku/ethereum/performance/trackers/BlockPublishingPerformanceImpl.java index 9772817cfef..6461a8a0cae 100644 --- a/ethereum/performance-trackers/src/main/java/tech/pegasys/teku/ethereum/performance/trackers/BlockPublishingPerformanceImpl.java +++ b/ethereum/performance-trackers/src/main/java/tech/pegasys/teku/ethereum/performance/trackers/BlockPublishingPerformanceImpl.java @@ -67,8 +67,8 @@ public void blobSidecarsPrepared() { } @Override - public void blockAndBlobSidecarsPublishingInitiated() { - performanceTracker.addEvent("block_and_blob_sidecars_publishing_initiated"); + public void blobSidecarsImportCompleted() { + performanceTracker.addEvent("blob_sidecars_imported"); } @Override @@ -76,6 +76,11 @@ public void blockPublishingInitiated() { performanceTracker.addEvent("block_publishing_initiated"); } + @Override + public void blobSidecarsPublishingInitiated() { + performanceTracker.addEvent("blob_sidecars_publishing_initiated"); + } + @Override public void blockImportCompleted() { performanceTracker.addEvent("block_import_completed"); diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlobSidecarManager.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlobSidecarManager.java index 6481d9bb8f1..7bfea3926be 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlobSidecarManager.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlobSidecarManager.java @@ -80,6 +80,7 @@ interface ReceivedBlobSidecarListener { enum RemoteOrigin { RPC, GOSSIP, - LOCAL_EL + LOCAL_EL, + LOCAL_PROPOSAL } } diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/block/BlockImportChannel.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/block/BlockImportChannel.java index 0ce020a1b75..1a62ef5f45c 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/block/BlockImportChannel.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/block/BlockImportChannel.java @@ -42,7 +42,7 @@ default SafeFuture importBlock( default SafeFuture importBlock( final SignedBeaconBlock block, final BroadcastValidationLevel broadcastValidationLevel) { - return importBlock(block, broadcastValidationLevel, Optional.empty()); + return importBlock(block, broadcastValidationLevel, Optional.of(RemoteOrigin.LOCAL_PROPOSAL)); } record BlockImportAndBroadcastValidationResults( diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java index 95582777962..5be4941c9ec 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java @@ -16,6 +16,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static tech.pegasys.teku.infrastructure.time.TimeUtilities.secondsToMillis; import static tech.pegasys.teku.statetransition.blobs.BlobSidecarManager.RemoteOrigin.LOCAL_EL; +import static tech.pegasys.teku.statetransition.blobs.BlobSidecarManager.RemoteOrigin.LOCAL_PROPOSAL; import com.google.common.annotations.VisibleForTesting; import java.time.Duration; @@ -77,10 +78,12 @@ public class BlockBlobSidecarsTrackersPoolImpl extends AbstractIgnoringFutureHis static final String COUNTER_GOSSIP_SUBTYPE = "gossip"; static final String COUNTER_LOCAL_EL_SUBTYPE = "local_el"; + static final String COUNTER_LOCAL_PROPOSAL_SUBTYPE = "local_proposal"; static final String COUNTER_RPC_SUBTYPE = "rpc"; static final String COUNTER_GOSSIP_DUPLICATE_SUBTYPE = "gossip_duplicate"; static final String COUNTER_RPC_DUPLICATE_SUBTYPE = "rpc_duplicate"; static final String COUNTER_LOCAL_EL_DUPLICATE_SUBTYPE = "local_el_duplicate"; + static final String COUNTER_LOCAL_PROPOSAL_DUPLICATE_SUBTYPE = "local_proposal_duplicate"; static final String COUNTER_RPC_FETCH_SUBTYPE = "rpc_fetch"; static final String COUNTER_LOCAL_EL_FETCH_SUBTYPE = "local_el_fetch"; @@ -226,7 +229,9 @@ public synchronized void onNewBlobSidecar( final BlockBlobSidecarsTracker blobSidecarsTracker = getOrCreateBlobSidecarsTracker( - slotAndBlockRoot, newTracker -> onFirstSeen(slotAndBlockRoot), existingTracker -> {}); + slotAndBlockRoot, + newTracker -> onFirstSeen(slotAndBlockRoot, Optional.of(remoteOrigin)), + existingTracker -> {}); if (blobSidecarsTracker.add(blobSidecar)) { sizeGauge.set(++totalBlobSidecars, GAUGE_BLOB_SIDECARS_LABEL); @@ -256,6 +261,8 @@ private void countBlobSidecar(final RemoteOrigin origin) { case GOSSIP -> poolStatsCounters.labels(COUNTER_SIDECAR_TYPE, COUNTER_GOSSIP_SUBTYPE).inc(); case LOCAL_EL -> poolStatsCounters.labels(COUNTER_SIDECAR_TYPE, COUNTER_LOCAL_EL_SUBTYPE).inc(); + case LOCAL_PROPOSAL -> + poolStatsCounters.labels(COUNTER_SIDECAR_TYPE, COUNTER_LOCAL_PROPOSAL_SUBTYPE).inc(); } } @@ -267,6 +274,10 @@ private void countDuplicateBlobSidecar(final RemoteOrigin origin) { poolStatsCounters.labels(COUNTER_SIDECAR_TYPE, COUNTER_GOSSIP_DUPLICATE_SUBTYPE).inc(); case LOCAL_EL -> poolStatsCounters.labels(COUNTER_SIDECAR_TYPE, COUNTER_LOCAL_EL_DUPLICATE_SUBTYPE).inc(); + case LOCAL_PROPOSAL -> + poolStatsCounters + .labels(COUNTER_SIDECAR_TYPE, COUNTER_LOCAL_PROPOSAL_DUPLICATE_SUBTYPE) + .inc(); } } @@ -469,7 +480,7 @@ private BlockBlobSidecarsTracker internalOnNewBlock( newTracker -> { newTracker.setBlock(block); countBlock(remoteOrigin); - onFirstSeen(slotAndBlockRoot); + onFirstSeen(slotAndBlockRoot, remoteOrigin); }, existingTracker -> { if (!existingTracker.setBlock(block)) { @@ -520,6 +531,8 @@ private void countBlock(final Optional maybeRemoteOrigin) { case GOSSIP -> poolStatsCounters.labels(COUNTER_BLOCK_TYPE, COUNTER_GOSSIP_SUBTYPE).inc(); case LOCAL_EL -> {} // only possible for blobs + case LOCAL_PROPOSAL -> + poolStatsCounters.labels(COUNTER_BLOCK_TYPE, COUNTER_LOCAL_PROPOSAL_SUBTYPE).inc(); } }); } @@ -534,6 +547,10 @@ private void countDuplicateBlock(final Optional maybeRemoteOrigin) poolStatsCounters .labels(COUNTER_BLOCK_TYPE, COUNTER_GOSSIP_DUPLICATE_SUBTYPE) .inc(); + case LOCAL_PROPOSAL -> + poolStatsCounters + .labels(COUNTER_BLOCK_TYPE, COUNTER_LOCAL_PROPOSAL_DUPLICATE_SUBTYPE) + .inc(); case LOCAL_EL -> {} // only possible for blobs } }); @@ -566,7 +583,14 @@ private void makeRoomForNewTracker() { } } - private void onFirstSeen(final SlotAndBlockRoot slotAndBlockRoot) { + private void onFirstSeen( + final SlotAndBlockRoot slotAndBlockRoot, final Optional remoteOrigin) { + final boolean isLocalBlockProduction = + remoteOrigin.map(ro -> ro.equals(LOCAL_PROPOSAL)).orElse(false); + if (isLocalBlockProduction) { + return; + } + final Duration fetchDelay = calculateFetchDelay(slotAndBlockRoot); asyncRunner diff --git a/infrastructure/time/src/main/java/tech/pegasys/teku/infrastructure/time/PerformanceTracker.java b/infrastructure/time/src/main/java/tech/pegasys/teku/infrastructure/time/PerformanceTracker.java index 3f34074a982..da671e493cc 100644 --- a/infrastructure/time/src/main/java/tech/pegasys/teku/infrastructure/time/PerformanceTracker.java +++ b/infrastructure/time/src/main/java/tech/pegasys/teku/infrastructure/time/PerformanceTracker.java @@ -14,6 +14,7 @@ package tech.pegasys.teku.infrastructure.time; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.function.Consumer; import org.apache.commons.lang3.tuple.Pair; @@ -22,7 +23,8 @@ public class PerformanceTracker { private final TimeProvider timeProvider; - protected final List> events = new ArrayList<>(); + protected final List> events = + Collections.synchronizedList(new ArrayList<>()); public PerformanceTracker(final TimeProvider timeProvider) { this.timeProvider = timeProvider; diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/P2PConfig.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/P2PConfig.java index 60aec96d20e..8a422fa767c 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/P2PConfig.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/P2PConfig.java @@ -38,6 +38,7 @@ public class P2PConfig { public static final int DEFAULT_P2P_TARGET_SUBNET_SUBSCRIBER_COUNT = 2; public static final boolean DEFAULT_SUBSCRIBE_ALL_SUBNETS_ENABLED = false; public static final boolean DEFAULT_GOSSIP_SCORING_ENABLED = true; + public static final boolean DEFAULT_GOSSIP_BLOBS_AFTER_BLOCK_ENABLED = false; public static final int DEFAULT_BATCH_VERIFY_MAX_THREADS = Math.max(2, Runtime.getRuntime().availableProcessors() / 2); public static final int DEFAULT_BATCH_VERIFY_QUEUE_CAPACITY = 15_000; @@ -59,7 +60,7 @@ public class P2PConfig { private final int batchVerifyQueueCapacity; private final int batchVerifyMaxBatchSize; private final boolean batchVerifyStrictThreadLimitEnabled; - + private final boolean isGossipBlobsAfterBlockEnabled; private final boolean allTopicsFilterEnabled; private P2PConfig( @@ -76,7 +77,8 @@ private P2PConfig( final int batchVerifyQueueCapacity, final int batchVerifyMaxBatchSize, final boolean batchVerifyStrictThreadLimitEnabled, - final boolean allTopicsFilterEnabled) { + final boolean allTopicsFilterEnabled, + final boolean isGossipBlobsAfterBlockEnabled) { this.spec = spec; this.networkConfig = networkConfig; this.discoveryConfig = discoveryConfig; @@ -92,6 +94,7 @@ private P2PConfig( this.batchVerifyStrictThreadLimitEnabled = batchVerifyStrictThreadLimitEnabled; this.networkingSpecConfig = spec.getNetworkingConfig(); this.allTopicsFilterEnabled = allTopicsFilterEnabled; + this.isGossipBlobsAfterBlockEnabled = isGossipBlobsAfterBlockEnabled; } public static Builder builder() { @@ -158,6 +161,10 @@ public boolean isAllTopicsFilterEnabled() { return allTopicsFilterEnabled; } + public boolean isGossipBlobsAfterBlockEnabled() { + return isGossipBlobsAfterBlockEnabled; + } + public static class Builder { private final NetworkConfig.Builder networkConfig = NetworkConfig.builder(); private final DiscoveryConfig.Builder discoveryConfig = DiscoveryConfig.builder(); @@ -177,6 +184,7 @@ public static class Builder { private boolean allTopicsFilterEnabled = DEFAULT_PEER_ALL_TOPIC_FILTER_ENABLED; private int floodPublishMaxMessageSizeThreshold = DEFAULT_FLOOD_PUBLISH_MAX_MESSAGE_SIZE_THRESHOLD; + private boolean gossipBlobsAfterBlockEnabled = DEFAULT_GOSSIP_BLOBS_AFTER_BLOCK_ENABLED; private Builder() {} @@ -223,7 +231,8 @@ public P2PConfig build() { batchVerifyQueueCapacity.orElse(DEFAULT_BATCH_VERIFY_QUEUE_CAPACITY), batchVerifyMaxBatchSize, batchVerifyStrictThreadLimitEnabled, - allTopicsFilterEnabled); + allTopicsFilterEnabled, + gossipBlobsAfterBlockEnabled); } private void validate() { @@ -294,6 +303,11 @@ public Builder floodPublishMaxMessageSizeThreshold( return this; } + public Builder gossipBlobsAfterBlockEnabled(final boolean gossipBlobsAfterBlockEnabled) { + this.gossipBlobsAfterBlockEnabled = gossipBlobsAfterBlockEnabled; + return this; + } + public Builder batchVerifyMaxThreads(final int batchVerifyMaxThreads) { if (batchVerifyMaxThreads < 0) { throw new InvalidConfigurationException( diff --git a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java index 084e2e6b6ae..091e0348554 100644 --- a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java +++ b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java @@ -207,6 +207,7 @@ import tech.pegasys.teku.validator.coordinator.performance.PerformanceTracker; import tech.pegasys.teku.validator.coordinator.performance.SyncCommitteePerformanceTracker; import tech.pegasys.teku.validator.coordinator.performance.ValidatorPerformanceMetrics; +import tech.pegasys.teku.validator.coordinator.publisher.MilestoneBasedBlockPublisher; import tech.pegasys.teku.weaksubjectivity.WeakSubjectivityCalculator; import tech.pegasys.teku.weaksubjectivity.WeakSubjectivityValidator; @@ -968,6 +969,22 @@ public void initValidatorApiHandler() { beaconConfig.getMetricsConfig().getBlockPublishingPerformanceWarningLocalThreshold(), beaconConfig.getMetricsConfig().getBlockPublishingPerformanceWarningBuilderThreshold()); + final DutyMetrics dutyMetrics = + DutyMetrics.create(metricsSystem, timeProvider, recentChainData, spec); + + final MilestoneBasedBlockPublisher blockPublisher = + new MilestoneBasedBlockPublisher( + beaconAsyncRunner, + spec, + blockFactory, + blockImportChannel, + blockGossipChannel, + blockBlobSidecarsTrackersPool, + blobSidecarGossipChannel, + performanceTracker, + dutyMetrics, + beaconConfig.p2pConfig().isGossipBlobsAfterBlockEnabled()); + final ValidatorApiHandler validatorApiHandler = new ValidatorApiHandler( new ChainDataProvider(spec, recentChainData, combinedChainDataClient, rewardCalculator), @@ -976,15 +993,11 @@ public void initValidatorApiHandler() { combinedChainDataClient, syncService, blockFactory, - blockImportChannel, - blockGossipChannel, - blockBlobSidecarsTrackersPool, - blobSidecarGossipChannel, attestationPool, attestationManager, attestationTopicSubscriber, activeValidatorTracker, - DutyMetrics.create(metricsSystem, timeProvider, recentChainData, spec), + dutyMetrics, performanceTracker, spec, forkChoiceTrigger, @@ -992,7 +1005,8 @@ public void initValidatorApiHandler() { syncCommitteeMessagePool, syncCommitteeContributionPool, syncCommitteeSubscriptionManager, - blockProductionPerformanceFactory); + blockProductionPerformanceFactory, + blockPublisher); eventChannels .subscribe(SlotEventsChannel.class, activeValidatorTracker) .subscribe(ExecutionClientEventsChannel.class, executionClientVersionProvider) diff --git a/teku/src/main/java/tech/pegasys/teku/cli/options/P2POptions.java b/teku/src/main/java/tech/pegasys/teku/cli/options/P2POptions.java index e268b2da55e..5065a63c5da 100644 --- a/teku/src/main/java/tech/pegasys/teku/cli/options/P2POptions.java +++ b/teku/src/main/java/tech/pegasys/teku/cli/options/P2POptions.java @@ -291,6 +291,17 @@ The network interface(s) on which the node listens for P2P communication. hidden = true) private Integer peerRateLimit = P2PConfig.DEFAULT_PEER_RATE_LIMIT; + @Option( + names = {"--Xp2p-gossip-blobs-after-block-enabled"}, + paramLabel = "", + showDefaultValue = Visibility.ALWAYS, + description = + "Enables experimental behaviour in which blobs are gossiped after the block has been gossiped to at least one peer.", + hidden = true, + arity = "0..1", + fallbackValue = "true") + private boolean gossipBlobsAfterBlockEnabled = P2PConfig.DEFAULT_GOSSIP_BLOBS_AFTER_BLOCK_ENABLED; + @Option( names = {"--Xpeer-all-topics-filter-enabled"}, paramLabel = "", @@ -410,7 +421,8 @@ public void configure(final TekuConfiguration.Builder builder) { .peerRateLimit(peerRateLimit) .allTopicsFilterEnabled(allTopicsFilterEnabled) .peerRequestLimit(peerRequestLimit) - .floodPublishMaxMessageSizeThreshold(floodPublishMaxMessageSizeThreshold); + .floodPublishMaxMessageSizeThreshold(floodPublishMaxMessageSizeThreshold) + .gossipBlobsAfterBlockEnabled(gossipBlobsAfterBlockEnabled); batchVerifyQueueCapacity.ifPresent(b::batchVerifyQueueCapacity); }) .discovery( diff --git a/teku/src/test/java/tech/pegasys/teku/cli/options/P2POptionsTest.java b/teku/src/test/java/tech/pegasys/teku/cli/options/P2POptionsTest.java index be720223370..766dec5453c 100644 --- a/teku/src/test/java/tech/pegasys/teku/cli/options/P2POptionsTest.java +++ b/teku/src/test/java/tech/pegasys/teku/cli/options/P2POptionsTest.java @@ -16,6 +16,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static tech.pegasys.teku.infrastructure.async.AsyncRunnerFactory.DEFAULT_MAX_QUEUE_SIZE_ALL_SUBNETS; +import static tech.pegasys.teku.networking.eth2.P2PConfig.DEFAULT_GOSSIP_BLOBS_AFTER_BLOCK_ENABLED; import static tech.pegasys.teku.networking.p2p.discovery.DiscoveryConfig.DEFAULT_P2P_PEERS_LOWER_BOUND_ALL_SUBNETS; import static tech.pegasys.teku.networking.p2p.discovery.DiscoveryConfig.DEFAULT_P2P_PEERS_UPPER_BOUND_ALL_SUBNETS; import static tech.pegasys.teku.networking.p2p.gossip.config.GossipConfig.DEFAULT_FLOOD_PUBLISH_MAX_MESSAGE_SIZE_THRESHOLD; @@ -354,6 +355,34 @@ public void floodPublishMaxMessageSizeThreshold_isSetCorrectly() { .isEqualTo(1000); } + @Test + public void gossipBlobsAfterBlockEnabled_defaultIsSetCorrectly() { + final TekuConfiguration config = getTekuConfigurationFromArguments(); + assertThat(config.p2p().isGossipBlobsAfterBlockEnabled()) + .isEqualTo(DEFAULT_GOSSIP_BLOBS_AFTER_BLOCK_ENABLED); + } + + @Test + public void gossipBlobsAfterBlockEnabled_shouldNotRequireAValue() { + final TekuConfiguration config = + getTekuConfigurationFromArguments("--Xp2p-gossip-blobs-after-block-enabled"); + assertThat(config.p2p().isGossipBlobsAfterBlockEnabled()).isTrue(); + } + + @Test + public void gossipBlobsAfterBlockEnabled_true() { + final TekuConfiguration config = + getTekuConfigurationFromArguments("--Xp2p-gossip-blobs-after-block-enabled=true"); + assertThat(config.p2p().isGossipBlobsAfterBlockEnabled()).isTrue(); + } + + @Test + public void gossipBlobsAfterBlockEnabled_false() { + final TekuConfiguration config = + getTekuConfigurationFromArguments("--Xp2p-gossip-blobs-after-block-enabled=false"); + assertThat(config.p2p().isGossipBlobsAfterBlockEnabled()).isFalse(); + } + @Test public void defaultPortsAreSetCorrectly() { final TekuConfiguration tekuConfiguration = getTekuConfigurationFromArguments();