Skip to content

Commit

Permalink
Introduce BlobsSidecar storage (#6587)
Browse files Browse the repository at this point in the history
  • Loading branch information
tbenr authored Dec 14, 2022
1 parent 2eda229 commit 61bb752
Show file tree
Hide file tree
Showing 28 changed files with 957 additions and 46 deletions.
12 changes: 12 additions & 0 deletions ethereum/spec/src/main/java/tech/pegasys/teku/spec/Spec.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import tech.pegasys.teku.spec.datastructures.execution.ExecutionPayload;
import tech.pegasys.teku.spec.datastructures.execution.ExecutionPayloadHeader;
import tech.pegasys.teku.spec.datastructures.execution.versions.capella.Withdrawal;
import tech.pegasys.teku.spec.datastructures.execution.versions.eip4844.BlobsSidecar;
import tech.pegasys.teku.spec.datastructures.forkchoice.MutableStore;
import tech.pegasys.teku.spec.datastructures.forkchoice.ReadOnlyForkChoiceStrategy;
import tech.pegasys.teku.spec.datastructures.forkchoice.ReadOnlyStore;
Expand Down Expand Up @@ -304,6 +305,17 @@ public ExecutionPayload deserializeExecutionPayload(
.sszDeserialize(serializedPayload);
}

public BlobsSidecar deserializeBlobsSidecar(
final Bytes serializedBlobsSidecar, final UInt64 slot) {
return atSlot(slot)
.getSchemaDefinitions()
.toVersionEip4844()
.orElseThrow(
() -> new RuntimeException("Eip4844 milestone is required to load execution payload"))
.getBlobsSidecarSchema()
.sszDeserialize(serializedBlobsSidecar);
}

public ExecutionPayloadHeader deserializeJsonExecutionPayloadHeader(
final ObjectMapper objectMapper, final File jsonFile, final UInt64 slot) throws IOException {
return atSlot(slot)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
package tech.pegasys.teku.spec.datastructures.blocks;

import com.google.common.base.MoreObjects;
import java.util.Comparator;
import java.util.Objects;
import org.apache.tuweni.bytes.Bytes32;
import org.jetbrains.annotations.NotNull;
import tech.pegasys.teku.infrastructure.logging.LogFormatter;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;

public class SlotAndBlockRoot {
public class SlotAndBlockRoot implements Comparable<SlotAndBlockRoot> {
private final UInt64 slot;
private final Bytes32 blockRoot;

Expand Down Expand Up @@ -52,6 +54,13 @@ public boolean equals(final Object o) {
return Objects.equals(slot, that.slot) && Objects.equals(blockRoot, that.blockRoot);
}

@Override
public int compareTo(@NotNull SlotAndBlockRoot o) {
return Comparator.comparing(SlotAndBlockRoot::getSlot)
.thenComparing(SlotAndBlockRoot::getBlockRoot)
.compare(this, o);
}

@Override
public int hashCode() {
return Objects.hash(slot, blockRoot);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import tech.pegasys.teku.spec.datastructures.blocks.SignedBlockAndState;
import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot;
import tech.pegasys.teku.spec.datastructures.blocks.StateAndBlockSummary;
import tech.pegasys.teku.spec.datastructures.execution.versions.eip4844.BlobsSidecar;
import tech.pegasys.teku.spec.datastructures.state.Checkpoint;
import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState;

Expand Down Expand Up @@ -76,4 +77,6 @@ SafeFuture<Optional<StateAndBlockSummary>> getHotStateAndBlockSummaryByBlockRoot
SafeFuture<Optional<Checkpoint>> getAnchor();

SafeFuture<Optional<DepositTreeSnapshot>> getFinalizedDepositSnapshot();

SafeFuture<Optional<BlobsSidecar>> getBlobsSidecar(final SlotAndBlockRoot slotAndBlockRoot);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
import tech.pegasys.teku.ethereum.pow.api.DepositTreeSnapshot;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.events.ChannelInterface;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot;
import tech.pegasys.teku.spec.datastructures.execution.versions.eip4844.BlobsSidecar;
import tech.pegasys.teku.spec.datastructures.state.AnchorPoint;
import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState;

Expand All @@ -36,5 +39,13 @@ public interface StorageUpdateChannel extends ChannelInterface {

SafeFuture<Void> onFinalizedDepositSnapshot(DepositTreeSnapshot depositTreeSnapshot);

SafeFuture<Void> onBlobsSidecar(BlobsSidecar blobsSidecar);

SafeFuture<Void> onBlobsSidecarRemoval(SlotAndBlockRoot blobsSidecarKey);

SafeFuture<Void> onBlobsSidecarPruning(UInt64 endSlot, int pruneLimit);

SafeFuture<Void> onUnconfirmedBlobsSidecarPruning(UInt64 endSlot, int pruneLimit);

void onChainInitialized(AnchorPoint initialAnchor);
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot;
import tech.pegasys.teku.spec.datastructures.execution.ExecutionPayload;
import tech.pegasys.teku.spec.datastructures.execution.SlotAndExecutionPayloadSummary;
import tech.pegasys.teku.spec.datastructures.execution.versions.eip4844.BlobsSidecar;
import tech.pegasys.teku.spec.datastructures.forkchoice.VoteTracker;
import tech.pegasys.teku.spec.datastructures.state.AnchorPoint;
import tech.pegasys.teku.spec.datastructures.state.Checkpoint;
Expand Down Expand Up @@ -97,7 +98,7 @@ public class DatabaseTest {

private static final List<BLSKeyPair> VALIDATOR_KEYS = BLSKeyGenerator.generateKeyPairs(3);

protected final Spec spec = TestSpecFactory.createMinimalBellatrix();
protected final Spec spec = TestSpecFactory.createMinimalEip4844();
final DataStructureUtil dataStructureUtil = new DataStructureUtil(spec);
private final ChainBuilder chainBuilder = ChainBuilder.create(spec, VALIDATOR_KEYS);
private final ChainProperties chainProperties = new ChainProperties(spec);
Expand Down Expand Up @@ -182,6 +183,140 @@ public void updateWeakSubjectivityState_setValue(final DatabaseContext context)
assertThat(database.getWeakSubjectivityState().getCheckpoint()).contains(checkpoint);
}

@TestTemplate
public void verifyBlobsLifecycle(final DatabaseContext context) throws IOException {
initialize(context);

// no blobs, no early slot
assertThat(database.getEarliestBlobsSidecarSlot()).isEmpty();

final BlobsSidecar blobsSidecar1 =
dataStructureUtil.randomBlobsSidecar(dataStructureUtil.randomBytes32(), UInt64.valueOf(1));
final BlobsSidecar blobsSidecar2 =
dataStructureUtil.randomBlobsSidecar(Bytes32.ZERO, UInt64.valueOf(2));
final BlobsSidecar blobsSidecar2bis =
dataStructureUtil.randomBlobsSidecar(Bytes32.fromHexString("0x01"), UInt64.valueOf(2));
final BlobsSidecar blobsSidecar3 =
dataStructureUtil.randomBlobsSidecar(dataStructureUtil.randomBytes32(), UInt64.valueOf(3));
final BlobsSidecar blobsSidecar4 =
dataStructureUtil.randomBlobsSidecar(dataStructureUtil.randomBytes32(), UInt64.valueOf(4));
final BlobsSidecar blobsSidecarNotAdded = dataStructureUtil.randomBlobsSidecar();

// add blobs out of order
database.storeUnconfirmedBlobsSidecar(blobsSidecar2);
database.storeUnconfirmedBlobsSidecar(blobsSidecar1);
database.storeUnconfirmedBlobsSidecar(blobsSidecar2bis);
database.storeUnconfirmedBlobsSidecar(blobsSidecar4);
database.storeUnconfirmedBlobsSidecar(blobsSidecar3);

assertThat(database.getEarliestBlobsSidecarSlot()).contains(ONE);

// all added blobs must be there
List.of(blobsSidecar1, blobsSidecar2, blobsSidecar2bis, blobsSidecar3, blobsSidecar4)
.forEach(
blobsSidecar ->
assertThat(database.getBlobsSidecar(blobsSidecarToSlotAndBlockRoot(blobsSidecar)))
.contains(blobsSidecar));

// non added blobs must not be there
assertThat(database.getBlobsSidecar(blobsSidecarToSlotAndBlockRoot(blobsSidecarNotAdded)))
.isEmpty();

// all blobs must be streamed ordered by slot
assertBlobsStream(blobsSidecar1, blobsSidecar2, blobsSidecar2bis, blobsSidecar3, blobsSidecar4);

// a subset of blobs must be streamed ordered by slot
assertBlobsStream(
UInt64.valueOf(2), UInt64.valueOf(3), blobsSidecar2, blobsSidecar2bis, blobsSidecar3);

// all blobs must be unconfirmed
assertUnconfirmedBlobsStream(
blobsSidecarToSlotAndBlockRoot(blobsSidecar1),
blobsSidecarToSlotAndBlockRoot(blobsSidecar2),
blobsSidecarToSlotAndBlockRoot(blobsSidecar2bis),
blobsSidecarToSlotAndBlockRoot(blobsSidecar3),
blobsSidecarToSlotAndBlockRoot(blobsSidecar4));

database.confirmBlobsSidecar(blobsSidecarToSlotAndBlockRoot(blobsSidecar4));

// only 1 and 3 blobs must be unconfirmed
assertUnconfirmedBlobsStream(
blobsSidecarToSlotAndBlockRoot(blobsSidecar1),
blobsSidecarToSlotAndBlockRoot(blobsSidecar2),
blobsSidecarToSlotAndBlockRoot(blobsSidecar2bis),
blobsSidecarToSlotAndBlockRoot(blobsSidecar3));
// we still have all blobs
assertBlobsStream(blobsSidecar1, blobsSidecar2, blobsSidecar2bis, blobsSidecar3, blobsSidecar4);

// let's prune unconfirmed with limit to 1
assertThat(database.pruneOldestUnconfirmedBlobsSidecar(UInt64.MAX_VALUE, 1)).isTrue();
assertUnconfirmedBlobsStream(
blobsSidecarToSlotAndBlockRoot(blobsSidecar2),
blobsSidecarToSlotAndBlockRoot(blobsSidecar2bis),
blobsSidecarToSlotAndBlockRoot(blobsSidecar3));
// we have all blobs except the first
assertBlobsStream(blobsSidecar2, blobsSidecar2bis, blobsSidecar3, blobsSidecar4);

// let's prune unconfirmed up to slot 1 (nothing will be pruned)
assertThat(database.pruneOldestUnconfirmedBlobsSidecar(ONE, 10)).isFalse();
assertUnconfirmedBlobsStream(
blobsSidecarToSlotAndBlockRoot(blobsSidecar2),
blobsSidecarToSlotAndBlockRoot(blobsSidecar2bis),
blobsSidecarToSlotAndBlockRoot(blobsSidecar3));
// we still have all blobs except the first
assertBlobsStream(blobsSidecar2, blobsSidecar2bis, blobsSidecar3, blobsSidecar4);

// let's prune all unconfirmed
assertThat(database.pruneOldestUnconfirmedBlobsSidecar(UInt64.valueOf(3), 10)).isFalse();
assertUnconfirmedBlobsStream();
// we have blobsSidecar4
assertBlobsStream(blobsSidecar4);

// let's prune all up to a too old slot (nothing will be pruned)
assertThat(database.pruneOldestBlobsSidecar(UInt64.valueOf(3), 10)).isFalse();
assertBlobsStream(blobsSidecar4);

// let's prune all up slot 4
assertThat(database.pruneOldestBlobsSidecar(UInt64.valueOf(4), 1)).isTrue();
// all empty now
assertUnconfirmedBlobsStream();
assertBlobsStream();
}

@TestTemplate
void pruneOldestBlobsSidecar_shouldPruneUnconfirmedBlobsToo(final DatabaseContext context)
throws IOException {
initialize(context);

// no blobs, no early slot
assertThat(database.getEarliestBlobsSidecarSlot()).isEmpty();

final BlobsSidecar blobsSidecar1 =
dataStructureUtil.randomBlobsSidecar(dataStructureUtil.randomBytes32(), UInt64.valueOf(1));
final BlobsSidecar blobsSidecar2 =
dataStructureUtil.randomBlobsSidecar(dataStructureUtil.randomBytes32(), UInt64.valueOf(2));
final BlobsSidecar blobsSidecar3 =
dataStructureUtil.randomBlobsSidecar(dataStructureUtil.randomBytes32(), UInt64.valueOf(3));
final BlobsSidecar blobsSidecar4 =
dataStructureUtil.randomBlobsSidecar(dataStructureUtil.randomBytes32(), UInt64.valueOf(4));

// add unconfirmed blobs
database.storeUnconfirmedBlobsSidecar(blobsSidecar2);
database.storeUnconfirmedBlobsSidecar(blobsSidecar1);
database.storeUnconfirmedBlobsSidecar(blobsSidecar4);
database.storeUnconfirmedBlobsSidecar(blobsSidecar3);

assertThat(database.pruneOldestBlobsSidecar(UInt64.MAX_VALUE, 2)).isTrue();
assertUnconfirmedBlobsStream(
blobsSidecarToSlotAndBlockRoot(blobsSidecar3),
blobsSidecarToSlotAndBlockRoot(blobsSidecar4));
assertBlobsStream(blobsSidecar3, blobsSidecar4);

assertThat(database.pruneOldestBlobsSidecar(UInt64.MAX_VALUE, 100)).isFalse();
assertUnconfirmedBlobsStream();
assertBlobsStream();
}

@TestTemplate
public void updateWeakSubjectivityState_clearValue(final DatabaseContext context)
throws IOException {
Expand Down Expand Up @@ -2127,6 +2262,31 @@ private void setDefaultStorage(final StorageSystem storageSystem) {
storageSystems.add(storageSystem);
}

private void assertUnconfirmedBlobsStream(SlotAndBlockRoot... slotAndBlockRoots) {
assertUnconfirmedBlobsStream(ZERO, UInt64.MAX_VALUE, slotAndBlockRoots);
}

private void assertUnconfirmedBlobsStream(
UInt64 startSlot, UInt64 endSlot, SlotAndBlockRoot... slotAndBlockRoots) {
try (Stream<SlotAndBlockRoot> blobsSidecarStream =
database.streamUnconfirmedBlobsSidecar(startSlot, endSlot)) {
final List<SlotAndBlockRoot> allSlotAndBlockRoots = blobsSidecarStream.collect(toList());
assertThat(allSlotAndBlockRoots).containsExactly(slotAndBlockRoots);
}
}

private void assertBlobsStream(BlobsSidecar... blobsSidecars) {
assertBlobsStream(ZERO, UInt64.MAX_VALUE, blobsSidecars);
}

private void assertBlobsStream(UInt64 startSlot, UInt64 endSlot, BlobsSidecar... blobsSidecars) {
try (Stream<BlobsSidecar> blobsSidecarStream =
database.streamBlobsSidecar(startSlot, endSlot)) {
final List<BlobsSidecar> allBlobs = blobsSidecarStream.collect(toList());
assertThat(allBlobs).containsExactly(blobsSidecars);
}
}

public static class CreateForkChainResult {
private final ChainBuilder forkChain;
private final UInt64 firstHotBlockSlot;
Expand All @@ -2144,4 +2304,9 @@ public UInt64 getFirstHotBlockSlot() {
return firstHotBlockSlot;
}
}

private static SlotAndBlockRoot blobsSidecarToSlotAndBlockRoot(final BlobsSidecar blobsSidecar) {
return new SlotAndBlockRoot(
blobsSidecar.getBeaconBlockSlot(), blobsSidecar.getBeaconBlockRoot());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import tech.pegasys.teku.spec.datastructures.blocks.StateAndBlockSummary;
import tech.pegasys.teku.spec.datastructures.execution.ExecutionPayload;
import tech.pegasys.teku.spec.datastructures.execution.ExecutionPayloadSummary;
import tech.pegasys.teku.spec.datastructures.execution.versions.eip4844.BlobsSidecar;
import tech.pegasys.teku.spec.datastructures.forkchoice.VoteTracker;
import tech.pegasys.teku.spec.datastructures.state.AnchorPoint;
import tech.pegasys.teku.spec.datastructures.state.Checkpoint;
Expand Down Expand Up @@ -173,6 +174,43 @@ public SafeFuture<Optional<SignedBeaconBlock>> getFinalizedBlockAtSlot(final UIn
.thenCompose(this::unblindBlock);
}

@Override
public SafeFuture<Void> onBlobsSidecar(final BlobsSidecar blobsSidecar) {
return SafeFuture.of(
() -> {
database.storeUnconfirmedBlobsSidecar(blobsSidecar);
return null;
});
}

@Override
public SafeFuture<Void> onBlobsSidecarRemoval(final SlotAndBlockRoot blobsSidecarKey) {
return SafeFuture.of(
() -> {
database.removeBlobsSidecar(blobsSidecarKey);
return null;
});
}

@Override
public SafeFuture<Void> onBlobsSidecarPruning(final UInt64 endSlot, final int pruneLimit) {
return SafeFuture.of(
() -> {
database.pruneOldestBlobsSidecar(endSlot, pruneLimit);
return null;
});
}

@Override
public SafeFuture<Void> onUnconfirmedBlobsSidecarPruning(
final UInt64 endSlot, final int pruneLimit) {
return SafeFuture.of(
() -> {
database.pruneOldestUnconfirmedBlobsSidecar(endSlot, pruneLimit);
return null;
});
}

private SafeFuture<Optional<SignedBeaconBlock>> unblindBlock(
final Optional<SignedBeaconBlock> maybeBlock) {
if (maybeBlock.isEmpty()) {
Expand Down Expand Up @@ -318,4 +356,10 @@ public void onVotesUpdated(final Map<UInt64, VoteTracker> votes) {
public SafeFuture<Optional<DepositTreeSnapshot>> getFinalizedDepositSnapshot() {
return SafeFuture.of(database::getFinalizedDepositSnapshot);
}

@Override
public SafeFuture<Optional<BlobsSidecar>> getBlobsSidecar(
final SlotAndBlockRoot slotAndBlockRoot) {
return SafeFuture.of(() -> database.getBlobsSidecar(slotAndBlockRoot));
}
}
Loading

0 comments on commit 61bb752

Please sign in to comment.