Skip to content

Commit

Permalink
core: make TestFastVsFullChains_RemoteFreezer to handle external rpc …
Browse files Browse the repository at this point in the history
…endpoint

- Install configurable env var GETH_ANCIENT_RPC to
allow user to define an external endpoint handled by
an independent freezer server instance. This can be
a socket, http, ws, or stdio.

Note that since the AncientStore API doesn't have a reset-button,
running multiple tests against the same freezer instance
may have negative outcomes, since the data stores will not
represent consistent collection (chains).

If no external freezer is configured, the default
will create an ephemeral memory-packed freezer
server and connect to it over a temporary IPC socket.

Signed-off-by: meows <b5c6@protonmail.com>
  • Loading branch information
meowsbits committed Jul 30, 2020
1 parent 79a9ffe commit 76f8b65
Showing 1 changed file with 99 additions and 50 deletions.
149 changes: 99 additions & 50 deletions core/blockchain_remotefreezer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package core

import (
"io/ioutil"
"log"
"math/big"
"os"
"path/filepath"
Expand All @@ -32,6 +31,7 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/params/types/genesisT"
"github.com/ethereum/go-ethereum/params/vars"
Expand All @@ -41,25 +41,49 @@ import (
// Tests in this file duplicate select tests from blockchain_test.go,
// replacing the built in file-based ancient db with a remote one over IPC.

func remoteIPCFreezerServer(t *testing.T) (ipcPath string, server *rpc.Server) {
frdir, err := ioutil.TempDir("", "")
if err != nil {
t.Fatalf("failed to create temp freezer dir: %v", err)
}
ipcPath = filepath.Join(frdir, "test.ipc")
listener, server, err := rpc.StartIPCEndpoint(ipcPath, nil)
if err != nil {
log.Fatalln(err)
var (
// testRPCFreezerURL = flag.String("ancient.rpc", "", "Configurable remote freezer server RPC endpoint")
testRPCFreezerURL = os.Getenv("GETH_ANCIENT_RPC")
)

// testRPCRemoteFreezer provides a configuration option to use an external
// remote freezer server, or to default (with no configured flags) to a built in
// ephemeral in-memory server over a temporary unix socket.
// If an external URL is configured via --ancient.rpc, then the return value for 'server' will be nil.
func testRPCRemoteFreezer(t *testing.T) (rpcFreezerEndpoint string, server *rpc.Server, ancientDB ethdb.Database) {
if testRPCFreezerURL == "" {
// If an external freezer server is not provided, spin up an ephemeral
// freezer over IPC.
frdir, err := ioutil.TempDir("", "")
if err != nil {
t.Fatalf("failed to create temp freezer dir: %v", err)
}
rpcFreezerEndpoint = filepath.Join(frdir, "test.ipc")

listener, server, err := rpc.StartIPCEndpoint(rpcFreezerEndpoint, nil)
if err != nil {
t.Fatal(err)
}
mock := lib.NewMockFreezerRemoteServerAPI()
err = server.RegisterName("freezer", mock)
if err != nil {
t.Fatal(err)
}
go func() {
server.ServeListener(listener)
}()

} else {
rpcFreezerEndpoint = testRPCFreezerURL
t.Log("Using external freezer:", rpcFreezerEndpoint)
}
mock := lib.NewMockFreezerRemoteServerAPI()
err = server.RegisterName("freezer", mock)

ancientDb, err := rawdb.NewDatabaseWithFreezerRemote(rawdb.NewMemoryDatabase(), rpcFreezerEndpoint)
if err != nil {
log.Fatalln(err)
t.Fatalf("failed to create temp freezer db: %v", err)
}
go func() {
server.ServeListener(listener)
}()
return ipcPath, server

return rpcFreezerEndpoint, server, ancientDb
}

// Tests that fast importing a block chain produces the same chain data as the
Expand Down Expand Up @@ -105,6 +129,7 @@ func TestFastVsFullChains_RemoteFreezer(t *testing.T) {
if n, err := archive.InsertChain(blocks); err != nil {
t.Fatalf("failed to process block %d: %v", n, err)
}

// Fast import the chain as a non-archive node to test
fastDb := rawdb.NewMemoryDatabase()
MustCommitGenesis(fastDb, gspec)
Expand All @@ -123,24 +148,52 @@ func TestFastVsFullChains_RemoteFreezer(t *testing.T) {
}

// Freezer style fast import the chain.
ipcPath, server := remoteIPCFreezerServer(t)
defer os.RemoveAll(filepath.Dir(ipcPath))
defer server.Stop()
ancientDb, err := rawdb.NewDatabaseWithFreezerRemote(rawdb.NewMemoryDatabase(), ipcPath)
if err != nil {
t.Fatalf("failed to create temp freezer db: %v", err)
freezerRPCEndpoint, server, ancientDb := testRPCRemoteFreezer(t)
if server != nil {
defer os.RemoveAll(filepath.Dir(freezerRPCEndpoint))
defer server.Stop()
}
defer ancientDb.Close() // Cause the Close method to be called.

MustCommitGenesis(ancientDb, gspec)
ancient, _ := NewBlockChain(ancientDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil, nil)
defer ancient.Stop()

ancientLimit := uint64(len(blocks)/2)

if n, err := ancient.InsertHeaderChain(headers, 1); err != nil {
t.Fatalf("failed to insert header %d: %v", n, err)
}
if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(len(blocks)/2)); err != nil {
if n, err := ancient.InsertReceiptChain(blocks, receipts, ancientLimit); err != nil {
t.Fatalf("failed to insert receipt %d: %v", n, err)
}

// Test a rollback, causing the ancient store to use the TruncateAncient method.
pinch := len(blocks)/4
rollbackHeaders := []common.Hash{}
for _, v := range headers[pinch:] {
rollbackHeaders = append(rollbackHeaders, v.Hash())
}
ancient.Rollback(rollbackHeaders)

// Reinsert the rolled-back headers and receipts.
if n, err := ancient.InsertHeaderChain(headers[pinch:], 1); err != nil {
t.Fatalf("failed to insert header %d: %v", n, err)
}
if n, err := ancient.InsertReceiptChain(blocks[pinch:], receipts, ancientLimit); err != nil {
t.Fatalf("failed to insert receipt %d: %v", n, err)
}

// Explicitly call the HasAncient method.
// This method doesn't appear to be used in normal geth operation, and I'm not sure
// why it exists, but we want to make sure that all API-defined methods get used.
for _, b := range blocks {
want := b.NumberU64() <= ancientLimit
if ok, err := ancientDb.HasAncient(rawdb.FreezerRemoteHeaderTable, b.NumberU64()); err != nil || ok != want {
t.Fatalf("ancientdb !HasAncient #%d: error=%v HasAncient=%v want=%v", b.NumberU64(), err, ok, want)
}
}

// Iterate over all chain data components, and cross reference
for i := 0; i < len(blocks); i++ {
num, hash := blocks[i].NumberU64(), blocks[i].Hash()
Expand Down Expand Up @@ -194,13 +247,12 @@ func TestBlockchainRecovery_RemoteFreezer(t *testing.T) {

// Import the chain as a ancient-first node and ensure all pointers are updated
// Freezer style fast import the chain.
ipcPath, server := remoteIPCFreezerServer(t)
defer os.RemoveAll(filepath.Dir(ipcPath))
defer server.Stop()
ancientDb, err := rawdb.NewDatabaseWithFreezerRemote(rawdb.NewMemoryDatabase(), ipcPath)
if err != nil {
t.Fatalf("failed to create temp freezer db: %v", err)
freezerRPCEndpoint, server, ancientDb := testRPCRemoteFreezer(t)
if server != nil {
defer os.RemoveAll(filepath.Dir(freezerRPCEndpoint))
defer server.Stop()
}
defer ancientDb.Close() // Cause the Close method to be called.

MustCommitGenesis(ancientDb, gspec)
ancient, _ := NewBlockChain(ancientDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil, nil)
Expand Down Expand Up @@ -249,13 +301,12 @@ func TestIncompleteAncientReceiptChainInsertion_RemoteFreezer(t *testing.T) {
blocks, receipts := GenerateChain(gspec.Config, genesis, ethash.NewFaker(), gendb, int(height), nil)

// Import the chain as a ancient-first node and ensure all pointers are updated
ipcPath, server := remoteIPCFreezerServer(t)
defer os.RemoveAll(filepath.Dir(ipcPath))
defer server.Stop()
ancientDb, err := rawdb.NewDatabaseWithFreezerRemote(rawdb.NewMemoryDatabase(), ipcPath)
if err != nil {
t.Fatalf("failed to create temp freezer db: %v", err)
freezerRPCEndpoint, server, ancientDb := testRPCRemoteFreezer(t)
if server != nil {
defer os.RemoveAll(filepath.Dir(freezerRPCEndpoint))
defer server.Stop()
}
defer ancientDb.Close() // Cause the Close method to be called.
MustCommitGenesis(ancientDb, gspec)
ancient, _ := NewBlockChain(ancientDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil, nil)
defer ancient.Stop()
Expand Down Expand Up @@ -344,13 +395,12 @@ func TestTransactionIndices_RemoteFreezer(t *testing.T) {
}
}
}
ipcPath, server := remoteIPCFreezerServer(t)
defer os.RemoveAll(filepath.Dir(ipcPath))
defer server.Stop()
ancientDb, err := rawdb.NewDatabaseWithFreezerRemote(rawdb.NewMemoryDatabase(), ipcPath)
if err != nil {
t.Fatalf("failed to create temp freezer db: %v", err)
freezerRPCEndpoint, server, ancientDb := testRPCRemoteFreezer(t)
if server != nil {
defer os.RemoveAll(filepath.Dir(freezerRPCEndpoint))
defer server.Stop()
}
defer ancientDb.Close() // Cause the Close method to be called.
MustCommitGenesis(ancientDb, gspec)

// Import all blocks into ancient db
Expand All @@ -375,7 +425,7 @@ func TestTransactionIndices_RemoteFreezer(t *testing.T) {
// Init block chain with external ancients, check all needed indices has been indexed.
limit := []uint64{0, 32, 64, 128}
for _, l := range limit {
ancientDb, err := rawdb.NewDatabaseWithFreezerRemote(rawdb.NewMemoryDatabase(), ipcPath)
ancientDb, err := rawdb.NewDatabaseWithFreezerRemote(rawdb.NewMemoryDatabase(), freezerRPCEndpoint)
if err != nil {
t.Fatalf("failed to create temp freezer db: %v", err)
}
Expand All @@ -395,7 +445,7 @@ func TestTransactionIndices_RemoteFreezer(t *testing.T) {
}

// Reconstruct a block chain which only reserves HEAD-64 tx indices
ancientDb, err = rawdb.NewDatabaseWithFreezerRemote(rawdb.NewMemoryDatabase(), ipcPath)
ancientDb, err = rawdb.NewDatabaseWithFreezerRemote(rawdb.NewMemoryDatabase(), freezerRPCEndpoint)
if err != nil {
t.Fatalf("failed to create temp freezer db: %v", err)
}
Expand Down Expand Up @@ -469,13 +519,12 @@ func TestSkipStaleTxIndicesInFastSync_RemoteFreezer(t *testing.T) {
}
}

ipcPath, server := remoteIPCFreezerServer(t)
defer os.RemoveAll(filepath.Dir(ipcPath))
defer server.Stop()
ancientDb, err := rawdb.NewDatabaseWithFreezerRemote(rawdb.NewMemoryDatabase(), ipcPath)
if err != nil {
t.Fatalf("failed to create temp freezer db: %v", err)
freezerRPCEndpoint, server, ancientDb := testRPCRemoteFreezer(t)
if server != nil {
defer os.RemoveAll(filepath.Dir(freezerRPCEndpoint))
defer server.Stop()
}
defer ancientDb.Close() // Cause the Close method to be called.
MustCommitGenesis(ancientDb, gspec)

// Import all blocks into ancient db, only HEAD-32 indices are kept.
Expand Down

0 comments on commit 76f8b65

Please sign in to comment.