Skip to content

Commit

Permalink
core/rawdb: freezer batch should implement the offset commit, ref bnb…
Browse files Browse the repository at this point in the history
…-chain/bsc#1005

Signed-off-by: Delweng <delweng@gmail.com>
  • Loading branch information
jsvisa authored and manav2401 committed Apr 9, 2024
1 parent 4a18c6c commit cf08b90
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 37 deletions.
2 changes: 1 addition & 1 deletion core/rawdb/freezer.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ func (f *Freezer) MigrateTable(kind string, convert convertLegacyFn) error {
return err
}
var (
batch = newTable.newBatch()
batch = newTable.newBatch(f.offset)
out []byte
start = time.Now()
logged = time.Now()
Expand Down
11 changes: 6 additions & 5 deletions core/rawdb/freezer_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type freezerBatch struct {
func newFreezerBatch(f *Freezer) *freezerBatch {
batch := &freezerBatch{tables: make(map[string]*freezerTableBatch, len(f.tables))}
for kind, table := range f.tables {
batch.tables[kind] = table.newBatch()
batch.tables[kind] = table.newBatch(f.offset)
}

return batch
Expand Down Expand Up @@ -94,11 +94,12 @@ type freezerTableBatch struct {
indexBuffer []byte
curItem uint64 // expected index of next append
totalBytes int64 // counts written bytes since reset
offset uint64
}

// newBatch creates a new batch for the freezer table.
func (t *freezerTable) newBatch() *freezerTableBatch {
batch := &freezerTableBatch{t: t}
func (t *freezerTable) newBatch(offset uint64) *freezerTableBatch {
batch := &freezerTableBatch{t: t, offset: offset}
if !t.noCompression {
batch.sb = new(snappyBuffer)
}
Expand All @@ -112,7 +113,7 @@ func (t *freezerTable) newBatch() *freezerTableBatch {
func (batch *freezerTableBatch) reset() {
batch.dataBuffer = batch.dataBuffer[:0]
batch.indexBuffer = batch.indexBuffer[:0]
batch.curItem = batch.t.items.Load()
batch.curItem = batch.t.items.Load() + batch.offset
batch.totalBytes = 0
}

Expand Down Expand Up @@ -222,7 +223,7 @@ func (batch *freezerTableBatch) commit() error {

// Update headBytes of table.
batch.t.headBytes += dataSize
batch.t.items.Store(batch.curItem)
batch.t.items.Store(batch.curItem - batch.offset)

// Update metrics.
batch.t.sizeGauge.Inc(dataSize + indexSize)
Expand Down
2 changes: 1 addition & 1 deletion core/rawdb/freezer_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -1068,7 +1068,7 @@ func (t *freezerTable) dumpIndex(w io.Writer, start, stop int64) {
// Fill adds empty data till given number (convenience method for backward compatibility)
func (t *freezerTable) Fill(number uint64) error {
if t.items.Load() < number {
b := t.newBatch()
b := t.newBatch(0)
log.Info("Filling all data into freezer for backward compatibility", "name", t.name, "items", &t.items, "number", number)

for t.items.Load() < number {
Expand Down
26 changes: 13 additions & 13 deletions core/rawdb/freezer_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func TestFreezerBasicsClosing(t *testing.T) {
// In-between writes, the table is closed and re-opened.
for x := 0; x < 255; x++ {
data := getChunk(15, x)
batch := f.newBatch()
batch := f.newBatch(0)
require.NoError(t, batch.AppendRaw(uint64(x), data))
require.NoError(t, batch.commit())
f.Close()
Expand Down Expand Up @@ -234,7 +234,7 @@ func TestFreezerRepairDanglingHeadLarge(t *testing.T) {
t.Errorf("Expected error for missing index entry")
}
// We should now be able to store items again, from item = 1
batch := f.newBatch()
batch := f.newBatch(0)
for x := 1; x < 0xff; x++ {
require.NoError(t, batch.AppendRaw(uint64(x), getChunk(15, ^x)))
}
Expand Down Expand Up @@ -443,7 +443,7 @@ func TestFreezerRepairFirstFile(t *testing.T) {
t.Fatal(err)
}
// Write 80 bytes, splitting out into two files
batch := f.newBatch()
batch := f.newBatch(0)
require.NoError(t, batch.AppendRaw(0, getChunk(40, 0xFF)))
require.NoError(t, batch.AppendRaw(1, getChunk(40, 0xEE)))
require.NoError(t, batch.commit())
Expand Down Expand Up @@ -485,7 +485,7 @@ func TestFreezerRepairFirstFile(t *testing.T) {
}

// Write 40 bytes
batch := f.newBatch()
batch := f.newBatch(0)
require.NoError(t, batch.AppendRaw(1, getChunk(40, 0xDD)))
require.NoError(t, batch.commit())

Expand Down Expand Up @@ -546,7 +546,7 @@ func TestFreezerReadAndTruncate(t *testing.T) {
f.truncateHead(0)

// Write the data again
batch := f.newBatch()
batch := f.newBatch(0)
for x := 0; x < 30; x++ {
require.NoError(t, batch.AppendRaw(uint64(x), getChunk(15, ^x)))
}
Expand All @@ -569,7 +569,7 @@ func TestFreezerOffset(t *testing.T) {
}

// Write 6 x 20 bytes, splitting out into three files
batch := f.newBatch()
batch := f.newBatch(0)
require.NoError(t, batch.AppendRaw(0, getChunk(20, 0xFF)))
require.NoError(t, batch.AppendRaw(1, getChunk(20, 0xEE)))

Expand Down Expand Up @@ -636,7 +636,7 @@ func TestFreezerOffset(t *testing.T) {
t.Log(f.dumpIndexString(0, 100))

// It should allow writing item 6.
batch := f.newBatch()
batch := f.newBatch(0)
require.NoError(t, batch.AppendRaw(6, getChunk(20, 0x99)))
require.NoError(t, batch.commit())

Expand Down Expand Up @@ -718,7 +718,7 @@ func TestTruncateTail(t *testing.T) {
}

// Write 7 x 20 bytes, splitting out into four files
batch := f.newBatch()
batch := f.newBatch(0)
require.NoError(t, batch.AppendRaw(0, getChunk(20, 0xFF)))
require.NoError(t, batch.AppendRaw(1, getChunk(20, 0xEE)))
require.NoError(t, batch.AppendRaw(2, getChunk(20, 0xdd)))
Expand Down Expand Up @@ -838,7 +838,7 @@ func TestTruncateHead(t *testing.T) {
}

// Write 7 x 20 bytes, splitting out into four files
batch := f.newBatch()
batch := f.newBatch(0)
require.NoError(t, batch.AppendRaw(0, getChunk(20, 0xFF)))
require.NoError(t, batch.AppendRaw(1, getChunk(20, 0xEE)))
require.NoError(t, batch.AppendRaw(2, getChunk(20, 0xdd)))
Expand All @@ -863,7 +863,7 @@ func TestTruncateHead(t *testing.T) {
})

// Append new items
batch = f.newBatch()
batch = f.newBatch(0)
require.NoError(t, batch.AppendRaw(4, getChunk(20, 0xbb)))
require.NoError(t, batch.AppendRaw(5, getChunk(20, 0xaa)))
require.NoError(t, batch.AppendRaw(6, getChunk(20, 0x11)))
Expand Down Expand Up @@ -930,7 +930,7 @@ func getChunk(size int, b int) []byte {
func writeChunks(t *testing.T, ft *freezerTable, n int, length int) {
t.Helper()

batch := ft.newBatch()
batch := ft.newBatch(0)
for i := 0; i < n; i++ {
if err := batch.AppendRaw(uint64(i), getChunk(length, i)); err != nil {
t.Fatalf("AppendRaw(%d, ...) returned error: %v", i, err)
Expand Down Expand Up @@ -1198,7 +1198,7 @@ func TestFreezerReadonly(t *testing.T) {

// Case 5: Now write some data via a batch.
// This should fail either during AppendRaw or Commit
batch := f.newBatch()
batch := f.newBatch(0)
writeErr := batch.AppendRaw(32, make([]byte, 1))

if writeErr == nil {
Expand Down Expand Up @@ -1366,7 +1366,7 @@ func runRandTest(rt randTest) bool {
}

case opAppend:
batch := f.newBatch()
batch := f.newBatch(0)
for i := 0; i < len(step.items); i++ {
batch.AppendRaw(step.items[i], step.blobs[i])
}
Expand Down
8 changes: 3 additions & 5 deletions core/rawdb/freezer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,14 +290,12 @@ func TestFreezerReadonlyValidate(t *testing.T) {
}

var item = make([]byte, 1024)

aBatch := f.tables["a"].newBatch()
aBatch := f.tables["a"].newBatch(0)
require.NoError(t, aBatch.AppendRaw(0, item))
require.NoError(t, aBatch.AppendRaw(1, item))
require.NoError(t, aBatch.AppendRaw(2, item))
require.NoError(t, aBatch.commit())

bBatch := f.tables["b"].newBatch()
bBatch := f.tables["b"].newBatch(0)
require.NoError(t, bBatch.AppendRaw(0, item))
require.NoError(t, bBatch.commit())

Expand Down Expand Up @@ -330,7 +328,7 @@ func TestFreezerConcurrentReadonly(t *testing.T) {
t.Fatal("can't open freezer", err)
}
var item = make([]byte, 1024)
batch := f.tables["a"].newBatch()
batch := f.tables["a"].newBatch(0)
items := uint64(10)
for i := uint64(0); i < items; i++ {
require.NoError(t, batch.AppendRaw(i, item))
Expand Down
25 changes: 13 additions & 12 deletions internal/cli/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/trie"
"github.com/prometheus/tsdb/fileutil"
"github.com/gofrs/flock"

"github.com/mitchellh/cli"
)
Expand Down Expand Up @@ -295,7 +295,7 @@ func (c *PruneBlockCommand) Run(args []string) int {
}
defer node.Close()

dbHandles, err := server.MakeDatabaseHandles()
dbHandles, err := server.MakeDatabaseHandles(0)
if err != nil {
c.UI.Error(err.Error())
return 1
Expand Down Expand Up @@ -330,8 +330,8 @@ func (c *PruneBlockCommand) accessDb(stack *node.Node, dbHandles int) error {
return errors.New("failed to load head block")
}
headHeader := headBlock.Header()
//Make sure the MPT and snapshot matches before pruning, otherwise the node can not start.
snaptree, err := snapshot.New(chaindb, trie.NewDatabase(chaindb), 256, headBlock.Root(), false, false, false)
// Make sure the MPT and snapshot matches before pruning, otherwise the node can not start.
snaptree, err := snapshot.New(snapshot.Config{CacheSize: 256}, chaindb, trie.NewDatabase(chaindb, nil), headBlock.Root())
if err != nil {
log.Error("snaptree error", "err", err)
return err // The relevant snapshot(s) might not exist
Expand All @@ -358,7 +358,7 @@ func (c *PruneBlockCommand) accessDb(stack *node.Node, dbHandles int) error {
// Ensure the root is really present. The weak assumption
// is the presence of root can indicate the presence of the
// entire trie.
if blob := rawdb.ReadTrieNode(chaindb, targetRoot); len(blob) == 0 {
if blob := rawdb.ReadLegacyTrieNode(chaindb, targetRoot); len(blob) == 0 {
// The special case is for clique based networks(rinkeby, goerli
// and some other private networks), it's possible that two
// consecutive blocks will have same root. In this case snapshot
Expand All @@ -372,15 +372,15 @@ func (c *PruneBlockCommand) accessDb(stack *node.Node, dbHandles int) error {
// as the pruning target.
var found bool
for i := len(layers) - 2; i >= 1; i-- {
if blob := rawdb.ReadTrieNode(chaindb, layers[i].Root()); len(blob) != 0 {
if blob := rawdb.ReadLegacyTrieNode(chaindb, layers[i].Root()); len(blob) != 0 {
targetRoot = layers[i].Root()
found = true
log.Info("Selecting middle-layer as the pruning target", "root", targetRoot, "depth", i)
break
}
}
if !found {
if blob := rawdb.ReadTrieNode(chaindb, snaptree.DiskRoot()); len(blob) != 0 {
if blob := rawdb.ReadLegacyTrieNode(chaindb, snaptree.DiskRoot()); len(blob) != 0 {
targetRoot = snaptree.DiskRoot()
found = true
log.Info("Selecting disk-layer as the pruning target", "root", targetRoot)
Expand Down Expand Up @@ -422,14 +422,16 @@ func (c *PruneBlockCommand) pruneBlock(stack *node.Node, fdHandles int) error {

blockpruner := pruner.NewBlockPruner(stack, oldAncientPath, newAncientPath, c.blockAmountReserved)

lock, exist, err := fileutil.Flock(filepath.Join(oldAncientPath, "PRUNEFLOCK"))
lock := flock.New(filepath.Join(oldAncientPath, "PRUNEFLOCK"))
locked, err := lock.TryLock()
if err != nil {
log.Error("file lock error", "err", err)
return err
}
if exist {
defer lock.Release()
log.Info("file lock existed, waiting for prune recovery and continue", "err", err)
defer lock.Close()

if !locked {
log.Info("file lock existed, waiting for prune recovery and continue")
if err := blockpruner.RecoverInterruption("chaindata", c.cache, fdHandles, "", false); err != nil {
log.Error("Pruning failed", "err", err)
return err
Expand Down Expand Up @@ -460,7 +462,6 @@ func (c *PruneBlockCommand) pruneBlock(stack *node.Node, fdHandles int) error {
return err
}

lock.Release()
log.Info("Block prune successfully")

return nil
Expand Down

0 comments on commit cf08b90

Please sign in to comment.