Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix snapsync heal #5838

Merged
merged 12 commits into from
Sep 18, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public static Optional<FastSyncDownloader<?>> createCheckpointDownloader(
.getAccountToRepair()
.ifPresent(
address ->
snapContext.addAccountsToBeRepaired(
snapContext.addAccountToHealingList(
CompactEncoding.bytesToPath(address.addressHash())));
} else if (fastSyncState.getPivotBlockHeader().isEmpty()
&& protocolContext.getBlockchain().getChainHeadBlockNumber()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public static Optional<FastSyncDownloader<?>> createSnapDownloader(
.getAccountToRepair()
.ifPresent(
address ->
snapContext.addAccountsToBeRepaired(
snapContext.addAccountToHealingList(
CompactEncoding.bytesToPath(address.addressHash())));
} else if (fastSyncState.getPivotBlockHeader().isEmpty()
&& protocolContext.getBlockchain().getChainHeadBlockNumber()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.heal.AccountFlatDatabaseHealingRangeRequest;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.heal.StorageFlatDatabaseHealingRangeRequest;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldDownloadState;
import org.hyperledger.besu.ethereum.worldstate.DataStorageFormat;
import org.hyperledger.besu.ethereum.worldstate.FlatDbMode;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.metrics.BesuMetricCategory;
Expand All @@ -43,6 +42,7 @@
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Stream;
Expand Down Expand Up @@ -72,7 +72,7 @@ public class SnapWorldDownloadState extends WorldDownloadState<SnapDataRequest>

protected final InMemoryTasksPriorityQueues<SnapDataRequest>
pendingStorageFlatDatabaseHealingRequests = new InMemoryTasksPriorityQueues<>();
private HashSet<Bytes> accountsToBeRepaired = new HashSet<>();
private HashSet<Bytes> accountsHealingList = new HashSet<>();
private DynamicPivotBlockSelector pivotBlockSelector;

private final SnapSyncStatePersistenceManager snapContext;
Expand Down Expand Up @@ -156,6 +156,7 @@ protected synchronized void markAsStalled(final int maxNodeRequestRetries) {
@Override
public synchronized boolean checkCompletion(final BlockHeader header) {

// Check if all snapsync tasks are completed
if (!internalFuture.isDone()
&& pendingAccountRequests.allTasksCompleted()
&& pendingCodeRequests.allTasksCompleted()
Expand All @@ -164,29 +165,50 @@ public synchronized boolean checkCompletion(final BlockHeader header) {
&& pendingTrieNodeRequests.allTasksCompleted()
&& pendingAccountFlatDatabaseHealingRequests.allTasksCompleted()
&& pendingStorageFlatDatabaseHealingRequests.allTasksCompleted()) {

// if all snapsync tasks are completed and the healing process was not running
if (!snapSyncState.isHealTrieInProgress()) {
// Register blockchain observer if not already registered
blockObserverId =
blockObserverId.isEmpty()
? OptionalLong.of(blockchain.observeBlockAdded(createBlockchainObserver()))
: blockObserverId;
// Start the healing process
startTrieHeal();
} else if (pivotBlockSelector.isBlockchainBehind()) {
}
// if all snapsync tasks are completed and the healing was running and blockchain is behind
// the pivot block
else if (pivotBlockSelector.isBlockchainBehind()) {
LOG.info("Pausing world state download while waiting for sync to complete");
if (blockObserverId.isEmpty())
blockObserverId = OptionalLong.of(blockchain.observeBlockAdded(getBlockAddedListener()));
// Set the snapsync to wait for the blockchain to catch up
snapSyncState.setWaitingBlockchain(true);
} else if (!snapSyncState.isHealFlatDatabaseInProgress()
&& worldStateStorage.getFlatDbMode().equals(FlatDbMode.FULL)) {
// only doing a flat db heal for bonsai
startFlatDatabaseHeal(header);
} else {
final WorldStateStorage.Updater updater = worldStateStorage.updater();
updater.saveWorldState(header.getHash(), header.getStateRoot(), rootNodeData);
updater.commit();
metricsManager.notifySnapSyncCompleted();
snapContext.clear();
internalFuture.complete(null);

return true;
}
// if all snapsync tasks are completed and the healing was running and the blockchain is not
// behind the pivot block
else {
// Remove the blockchain observer
blockObserverId.ifPresent(blockchain::removeObserver);
// If the flat database healing process is not in progress and the flat database mode is
// FULL
if (!snapSyncState.isHealFlatDatabaseInProgress()
&& worldStateStorage.getFlatDbMode().equals(FlatDbMode.FULL)) {
// Start the flat database healing process
startFlatDatabaseHeal(header);
}
// If the flat database healing process is in progress or the flat database mode is not FULL
else {
final WorldStateStorage.Updater updater = worldStateStorage.updater();
updater.saveWorldState(header.getHash(), header.getStateRoot(), rootNodeData);
updater.commit();
// Notify that the snap sync has completed
metricsManager.notifySnapSyncCompleted();
// Clear the snap context
snapContext.clear();
internalFuture.complete(null);
return true;
}
}
}

return false;
}

Expand All @@ -200,10 +222,11 @@ protected synchronized void cleanupQueues() {
pendingTrieNodeRequests.clear();
}

/** Method to start the healing process of the trie */
public synchronized void startTrieHeal() {
snapContext.clearAccountRangeTasks();
snapSyncState.setHealTrieStatus(true);
// try to find new pivot block before healing
// Try to find a new pivot block before starting the healing process
pivotBlockSelector.switchToNewPivotBlock(
(blockHeader, newPivotBlockFound) -> {
snapContext.clearAccountRangeTasks();
Expand All @@ -212,21 +235,25 @@ public synchronized void startTrieHeal() {
blockHeader.getNumber());
enqueueRequest(
createAccountTrieNodeDataRequest(
blockHeader.getStateRoot(), Bytes.EMPTY, accountsToBeRepaired));
blockHeader.getStateRoot(), Bytes.EMPTY, accountsHealingList));
});
}

/** Method to reload the healing process of the trie */
public synchronized void reloadTrieHeal() {
// Clear the flat database and trie log from the world state storage if needed
worldStateStorage.clearFlatDatabase();
worldStateStorage.clearTrieLog();
// Clear pending trie node and code requests
pendingTrieNodeRequests.clear();
pendingCodeRequests.clear();

snapSyncState.setHealTrieStatus(false);
checkCompletion(snapSyncState.getPivotBlockHeader().orElseThrow());
}

public synchronized void startFlatDatabaseHeal(final BlockHeader header) {
LOG.info("Running flat database heal process");
LOG.info("Initiating the healing process for the flat database");
snapSyncState.setHealFlatDatabaseInProgress(true);
final Map<Bytes32, Bytes32> ranges = RangeManager.generateAllRanges(16);
ranges.forEach(
Expand All @@ -235,10 +262,6 @@ public synchronized void startFlatDatabaseHeal(final BlockHeader header) {
createAccountFlatHealingRangeRequest(header.getStateRoot(), key, value)));
}

public boolean isBonsaiStorageFormat() {
return worldStateStorage.getDataStorageFormat().equals(DataStorageFormat.BONSAI);
}

@Override
public synchronized void enqueueRequest(final SnapDataRequest request) {
if (!internalFuture.isDone()) {
Expand All @@ -263,8 +286,8 @@ public synchronized void enqueueRequest(final SnapDataRequest request) {
}
}

public synchronized void setAccountsToBeRepaired(final HashSet<Bytes> accountsToBeRepaired) {
this.accountsToBeRepaired = accountsToBeRepaired;
public synchronized void setAccountsHealingList(final HashSet<Bytes> addAccountToHealingList) {
this.accountsHealingList = addAccountToHealingList;
}

/**
Expand All @@ -274,15 +297,15 @@ public synchronized void setAccountsToBeRepaired(final HashSet<Bytes> accountsTo
*
* @param account The account to be added for repair.
*/
public synchronized void addAccountsToBeRepaired(final Bytes account) {
if (!accountsToBeRepaired.contains(account)) {
snapContext.addAccountsToBeRepaired(account);
accountsToBeRepaired.add(account);
public synchronized void addAccountToHealingList(final Bytes account) {
if (!accountsHealingList.contains(account)) {
snapContext.addAccountToHealingList(account);
accountsHealingList.add(account);
}
}

public HashSet<Bytes> getAccountsToBeRepaired() {
return accountsToBeRepaired;
public HashSet<Bytes> getAccountsHealingList() {

Check failure

Code scanning / CodeQL

Inconsistent synchronization of getter and setter

This get method is unsynchronized, but the corresponding [set method](1) is synchronized.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ignore this one because it's not in the scope of this PR. I'm just renaming the method in this PR . Don't want to modify the logic

return accountsHealingList;
}

@Override
Expand Down Expand Up @@ -385,25 +408,25 @@ public void setPivotBlockSelector(final DynamicPivotBlockSelector pivotBlockSele
this.pivotBlockSelector = pivotBlockSelector;
}

public BlockAddedObserver getBlockAddedListener() {
public BlockAddedObserver createBlockchainObserver() {
return addedBlockContext -> {
if (snapSyncState.isWaitingBlockchain()) {
// if we receive a new pivot block we can restart the heal
pivotBlockSelector.check(
(____, isNewPivotBlock) -> {
if (isNewPivotBlock) {
snapSyncState.setWaitingBlockchain(false);
}
});
// if we are close to the head we can also restart the heal and finish snapsync
if (!pivotBlockSelector.isBlockchainBehind()) {
snapSyncState.setWaitingBlockchain(false);
}
if (!snapSyncState.isWaitingBlockchain()) {
blockObserverId.ifPresent(blockchain::removeObserver);
blockObserverId = OptionalLong.empty();
reloadTrieHeal();
}
final AtomicBoolean foundNewPivotBlock = new AtomicBoolean(false);
pivotBlockSelector.check(
Copy link
Contributor

@garyschulte garyschulte Sep 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it not a problem that check executes async code when it determines that the pivot block is too far behind ? For example could we incorrectly return no new pivot if we have no peers.

The async code in DynamicPivotBlockSelector makes this hard to reason about. This PR doesn't make this worse, but still hard to follow.

Copy link
Contributor Author

@matkt matkt Sep 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, the code for the Pivot Block Selector is a bit complex to understand and could benefit from more detailed descriptions.

This class fetches pivot blocks in advance to avoid having to search at the time of the call. I don't have the exact numbers, but for example:

I call the check method and if the distance to the head is >60 and <126, I will cache a pivot block. I won't use it right now. On the next call, when the distance to the head is >126, I will take the one cached. This means that every time we call this method and we are in this range(>60 and <126), we are looking for a pivot block to put in a cache , This is to have fast access to the next pivot block

In conclusion, there will be no problem as it will be synchronous (every time we will take the pivot from cache).

Copy link
Contributor Author

@matkt matkt Sep 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would probably be beneficial to simplify this pivot block selection code in the future as it's more technical debt. Previously, obtaining a pivot block required querying 5 peers and reaching a consensus, which could be time-consuming. Now, it's simply a matter of taking the last safe block. Therefore, this logic may no longer be necessary.

(____, isNewPivotBlock) -> {
if (isNewPivotBlock) {
foundNewPivotBlock.set(true);
}
});

final boolean isNewPivotBlockFound = foundNewPivotBlock.get();
final boolean isBlockchainCaughtUp =
snapSyncState.isWaitingBlockchain() && !pivotBlockSelector.isBlockchainBehind();

if (isNewPivotBlockFound
|| isBlockchainCaughtUp) { // restart heal if we found a new pivot block or if close to
// head again
snapSyncState.setWaitingBlockchain(false);
reloadTrieHeal();
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,10 @@ public CompletableFuture<Void> run(

final List<AccountRangeDataRequest> currentAccountRange =
snapContext.getCurrentAccountRange();
final HashSet<Bytes> inconsistentAccounts = snapContext.getAccountsToBeRepaired();
final HashSet<Bytes> inconsistentAccounts = snapContext.getAccountsHealingList();

if (!currentAccountRange.isEmpty()) { // continue to download worldstate ranges
newDownloadState.setAccountsToBeRepaired(inconsistentAccounts);
newDownloadState.setAccountsHealingList(inconsistentAccounts);
snapContext
.getCurrentAccountRange()
.forEach(
Expand All @@ -165,14 +165,14 @@ public CompletableFuture<Void> run(
DOWNLOAD, snapDataRequest.getStartKeyHash(), snapDataRequest.getEndKeyHash());
newDownloadState.enqueueRequest(snapDataRequest);
});
} else if (!snapContext.getAccountsToBeRepaired().isEmpty()) { // restart only the heal step
} else if (!snapContext.getAccountsHealingList().isEmpty()) { // restart only the heal step
snapSyncState.setHealTrieStatus(true);
worldStateStorage.clearFlatDatabase();
worldStateStorage.clearTrieLog();
newDownloadState.setAccountsToBeRepaired(inconsistentAccounts);
newDownloadState.setAccountsHealingList(inconsistentAccounts);
newDownloadState.enqueueRequest(
SnapDataRequest.createAccountTrieNodeDataRequest(
stateRoot, Bytes.EMPTY, snapContext.getAccountsToBeRepaired()));
stateRoot, Bytes.EMPTY, snapContext.getAccountsHealingList()));
} else {
// start from scratch
worldStateStorage.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
*/
public class SnapSyncStatePersistenceManager {

private final byte[] SNAP_ACCOUNT_TO_BE_REPAIRED_INDEX =
private final byte[] SNAP_ACCOUNT_HEALING_LIST_INDEX =
"snapInconsistentAccountsStorageIndex".getBytes(StandardCharsets.UTF_8);

private final GenericKeyValueStorageFacade<BigInteger, AccountRangeDataRequest>
Expand Down Expand Up @@ -104,20 +104,20 @@ public void updatePersistedTasks(final List<? extends SnapDataRequest> accountRa
}

/**
* Persists the current accounts to be repaired in the database.
* Persists the current accounts to heal in the database.
*
* @param accountsToBeRepaired The current list of accounts to persist.
* @param accountsHealingList The current list of accounts to heal.
*/
public void addAccountsToBeRepaired(final Bytes accountsToBeRepaired) {
public void addAccountToHealingList(final Bytes accountsHealingList) {
final BigInteger index =
healContext
.get(SNAP_ACCOUNT_TO_BE_REPAIRED_INDEX)
.get(SNAP_ACCOUNT_HEALING_LIST_INDEX)
.map(bytes -> new BigInteger(bytes.toArrayUnsafe()).add(BigInteger.ONE))
.orElse(BigInteger.ZERO);
healContext.putAll(
keyValueStorageTransaction -> {
keyValueStorageTransaction.put(SNAP_ACCOUNT_TO_BE_REPAIRED_INDEX, index.toByteArray());
keyValueStorageTransaction.put(index.toByteArray(), accountsToBeRepaired.toArrayUnsafe());
keyValueStorageTransaction.put(SNAP_ACCOUNT_HEALING_LIST_INDEX, index.toByteArray());
keyValueStorageTransaction.put(index.toByteArray(), accountsHealingList.toArrayUnsafe());
});
}

Expand All @@ -127,9 +127,9 @@ public List<AccountRangeDataRequest> getCurrentAccountRange() {
.collect(Collectors.toList());
}

public HashSet<Bytes> getAccountsToBeRepaired() {
public HashSet<Bytes> getAccountsHealingList() {
return healContext
.streamValuesFromKeysThat(notEqualsTo(SNAP_ACCOUNT_TO_BE_REPAIRED_INDEX))
.streamValuesFromKeysThat(notEqualsTo(SNAP_ACCOUNT_HEALING_LIST_INDEX))
.collect(Collectors.toCollection(HashSet::new));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ public void addResponse(
if (!slots.isEmpty() || !proofs.isEmpty()) {
if (!worldStateProofProvider.isValidRangeProof(
startKeyHash, endKeyHash, storageRoot, proofs, slots)) {
// If the proof is invalid, it means that the storage will be a mix of several blocks.
// Therefore, it will be necessary to heal the account's storage subsequently
downloadState.addAccountToHealingList(CompactEncoding.bytesToPath(accountHash));
// We will request the new storage root of the account because it is apparently no longer
// valid with the new pivot block.
downloadState.enqueueRequest(
createAccountDataRequest(
getRootHash(), Hash.wrap(accountHash), startKeyHash, endKeyHash));
Expand Down Expand Up @@ -173,7 +178,7 @@ public Stream<SnapDataRequest> getChildRequests(
});
if (startKeyHash.equals(MIN_RANGE) && endKeyHash.equals(MAX_RANGE)) {
// need to heal this account storage
downloadState.addAccountsToBeRepaired(CompactEncoding.bytesToPath(accountHash));
downloadState.addAccountToHealingList(CompactEncoding.bytesToPath(accountHash));
}
});

Expand Down
Loading