Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add --X-trie-log subcommand #6303

Merged
merged 27 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
16c0a49
Add x-trie-log subcommand for one-off backlog prune
siladu Nov 20, 2023
7dd4928
long -> int
siladu Nov 20, 2023
bf2b098
Removed banned method
gfukushima Dec 12, 2023
e67ae51
Preload process stream in parallel
gfukushima Dec 12, 2023
9b4e0c9
Drop unwanted trielogs and keep reatain layers only
gfukushima Dec 14, 2023
0b9fe83
Add output to user and cleanup refactor
gfukushima Dec 15, 2023
426848e
small tweak to display cf that had reference dropped by RocksDbSegmen…
gfukushima Dec 15, 2023
7401b59
spotless
gfukushima Dec 15, 2023
1b7fb72
Fix classes that changed package
gfukushima Dec 15, 2023
11e6b05
spotless
gfukushima Dec 15, 2023
f2d01e2
Code review
gfukushima Dec 15, 2023
04f1aaa
Only clear DB when we have the exact amount of trie logs we want in m…
gfukushima Dec 15, 2023
2f01c5a
Trielogs stream to and from file to avoid possibly OOM
gfukushima Dec 18, 2023
56e4c8e
Process trie logs in chunks to avoid OOM
gfukushima Dec 18, 2023
78561b0
save and read in batches to handle edge cases
gfukushima Dec 19, 2023
42c72cf
save and read files to/from database dir
gfukushima Dec 20, 2023
9961fc2
Merge branch 'main' into x-trie-log-subcommand-2
gfukushima Dec 20, 2023
9389540
add unit tests and PR review fixes
gfukushima Dec 21, 2023
e3d4fbc
Merge branch 'main' into x-trie-log-subcommand-2
gfukushima Dec 21, 2023
c7144fe
spdx
gfukushima Dec 21, 2023
20b0ba5
Fix unit tests directory creation and deletion
gfukushima Dec 21, 2023
e7d175c
Merge branch 'main' into x-trie-log-subcommand-2
gfukushima Jan 3, 2024
b214bf2
PR review
gfukushima Jan 4, 2024
aa75348
Merge branch 'main' into x-trie-log-subcommand-2
gfukushima Jan 4, 2024
2bc0732
Merge remote-tracking branch 'origin/x-trie-log-subcommand-2' into x-…
gfukushima Jan 4, 2024
deec021
Merge branch 'main' into x-trie-log-subcommand-2
gfukushima Jan 8, 2024
5d7b68e
Merge branch 'main' into x-trie-log-subcommand-2
gfukushima Jan 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Trielogs stream to and from file to avoid possibly OOM
Signed-off-by: Gabriel Fukushima <gabrielfukushima@gmail.com>
  • Loading branch information
gfukushima committed Dec 18, 2023
commit 2f01c5a4d03e231f0f018b06d661b60683d2cfd7
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,20 @@
import org.hyperledger.besu.ethereum.trie.bonsai.storage.BonsaiWorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.worldstate.DataStorageConfiguration;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.common.base.Splitter;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import org.bouncycastle.util.encoders.Base64;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -56,21 +51,25 @@ static void prune(
final PrintWriter out,
final DataStorageConfiguration config,
final BonsaiWorldStateKeyValueStorage rootWorldStateStorage,
final MutableBlockchain blockchain)
throws IOException {
final MutableBlockchain blockchain) {

TrieLogHelper.validatePruneConfiguration(config);
final long layersToRetain = config.getUnstable().getBonsaiTrieLogRetentionThreshold();
final long chainHeight = blockchain.getChainHeadBlockNumber();
final long lastBlockToRetainTrieLogsFor = chainHeight - layersToRetain;
final long lastBlockNumberToRetainTrieLogsFor = chainHeight - layersToRetain;
if (lastBlockNumberToRetainTrieLogsFor < 0) {
LOG.error(
"Trying to retain more trie logs than chain height ({}), skipping pruning", chainHeight);
return;
}
final Optional<Hash> finalizedBlockHash = blockchain.getFinalized();

if (finalizedBlockHash.isEmpty()) {
LOG.error("No finalized block present, skipping pruning");
return;
} else {
if (blockchain.getBlockHeader(finalizedBlockHash.get()).get().getNumber()
< lastBlockToRetainTrieLogsFor) {
< lastBlockNumberToRetainTrieLogsFor) {
LOG.error("Trying to prune more layers than the finalized block height, skipping pruning");
return;
}
Expand All @@ -79,9 +78,11 @@ static void prune(
// retrieve the layersToRetains hashes from blockchain
final List<Hash> trieLogKeys = new ArrayList<>();

for (long i = chainHeight; i > lastBlockToRetainTrieLogsFor; i--) {
for (long i = chainHeight; i > lastBlockNumberToRetainTrieLogsFor; i--) {
final Optional<BlockHeader> header = blockchain.getBlockHeader(i);
header.ifPresent(blockHeader -> trieLogKeys.add(blockHeader.getHash()));
header.ifPresentOrElse(
blockHeader -> trieLogKeys.add(blockHeader.getHash()),
() -> LOG.error("Error retrieving block"));
}

IdentityHashMap<byte[], byte[]> trieLogsToRetain;
Expand All @@ -98,11 +99,21 @@ static void prune(
.ifPresent(trieLog -> trieLogsToRetain.put(hash.toArrayUnsafe(), trieLog));
});
out.println("Saving trielogs to retain in file...");
saveTrieLogsInFile(trieLogsToRetain);
try {
saveTrieLogsInFile(trieLogsToRetain);
} catch (IOException e) {
LOG.error("Error saving trielogs to file: {}", e.getMessage());
return;
}
} else {
// in case something went wrong and we already pruned trielogs
// users can re-un the subcommand and we will read trielogs from file
trieLogsToRetain = readTrieLogsFromFile();
try {
trieLogsToRetain = readTrieLogsFromFile();
} catch (Exception e) {
LOG.error("Error reading trielogs from file: {}", e.getMessage());
return;
}
}

if (trieLogsToRetain.size() == layersToRetain) {
Expand All @@ -114,7 +125,7 @@ static void prune(
if (rootWorldStateStorage.streamTrieLogKeys(layersToRetain).count() == layersToRetain) {
out.println("Prune ran successfully. Deleting file...");
deleteTrieLogFile();
out.println("Enjoy some GBs of storage back!...");
out.println("Enjoy some GBs of storage back!");
} else {
out.println("Prune failed. Re-run the subcommand to load the trielogs from file.");
}
Expand Down Expand Up @@ -188,35 +199,38 @@ static TrieLogCount getCount(
return new TrieLogCount(total.get(), canonicalCount.get(), forkCount.get(), orphanCount.get());
}

private static void saveTrieLogsInFile(final Map<byte[], byte[]> trieLogs) throws IOException {
private static void saveTrieLogsInFile(final IdentityHashMap<byte[], byte[]> trieLogs)
throws IOException {

File file = new File(trieLogFile);

try (BufferedWriter bf = new BufferedWriter(new FileWriter(file, StandardCharsets.UTF_8))) {
for (Map.Entry<byte[], byte[]> entry : trieLogs.entrySet()) {
bf.write(Bytes.of(entry.getKey()) + ":" + Base64.toBase64String(entry.getValue()));
bf.newLine();
if (file.exists()) {
LOG.error("File {} already exists, something went terribly wrong", trieLogFile);
}
try (FileOutputStream fos = new FileOutputStream(trieLogFile)) {
try {
ObjectOutputStream oos = new ObjectOutputStream(fos);
oos.writeObject(trieLogs);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
bf.flush();
} catch (IOException e) {
LOG.error(e.getMessage());
throw e;
}
}

private static IdentityHashMap<byte[], byte[]> readTrieLogsFromFile() throws IOException {
@SuppressWarnings("unchecked")
private static IdentityHashMap<byte[], byte[]> readTrieLogsFromFile()
throws IOException, ClassNotFoundException {

IdentityHashMap<byte[], byte[]> trieLogs;
try (FileInputStream fis = new FileInputStream(trieLogFile);
ObjectInputStream ois = new ObjectInputStream(fis)) {

trieLogs = (IdentityHashMap<byte[], byte[]>) ois.readObject();

} catch (IOException | ClassNotFoundException e) {

File file = new File(trieLogFile);
IdentityHashMap<byte[], byte[]> trieLogs = new IdentityHashMap<>();
try (BufferedReader br = new BufferedReader(new FileReader(file, StandardCharsets.UTF_8))) {
String line;
while ((line = br.readLine()) != null) {
List<String> parts = Splitter.on(':').splitToList(line);
byte[] key = Bytes.fromHexString(parts.get(0)).toArrayUnsafe();
byte[] value = Base64.decode(parts.get(1));
trieLogs.put(key, value);
}
} catch (IOException e) {
LOG.error(e.getMessage());
throw e;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,11 @@ static class PruneTrieLog implements Runnable {
public void run() {
TrieLogContext context = getTrieLogContext();

try {
TrieLogHelper.prune(
spec.commandLine().getOut(),
context.config(),
context.rootWorldStateStorage(),
context.blockchain());
} catch (Exception e) {
throw new RuntimeException(e);
}
TrieLogHelper.prune(
spec.commandLine().getOut(),
context.config(),
context.rootWorldStateStorage(),
context.blockchain());
}
}

Expand Down
Loading