Skip to content

Commit

Permalink
Merge pull request ethereum#353 from ethersphere/adapt_export_import
Browse files Browse the repository at this point in the history
fix swarm db export import
  • Loading branch information
nonsense authored Mar 30, 2018
2 parents 167159b + af16c61 commit d6e157c
Show file tree
Hide file tree
Showing 5 changed files with 266 additions and 27 deletions.
139 changes: 139 additions & 0 deletions cmd/swarm/export_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright 2018 The go-ethereum Authors
// This file is part of go-ethereum.
//
// go-ethereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// go-ethereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>.

package main

import (
"bytes"
"crypto/md5"
"crypto/rand"
"io"
"io/ioutil"
"net/http"
"os"
"strings"
"testing"

"github.com/ethereum/go-ethereum/swarm"
)

// TestCLISwarmExportImport perform the following test:
// 1. runs swarm node
// 2. uploads a random file
// 3. runs an export of the local datastore
// 4. runs a second swarm node
// 5. imports the exported datastore
// 6. fetches the uploaded random file from the second node
func TestCLISwarmExportImport(t *testing.T) {
cluster := newTestCluster(t, 1)

// generate random 10mb file
f, cleanup := generateRandomFile(t, 10000000)
defer cleanup()

// upload the file with 'swarm up' and expect a hash
up := runSwarm(t, "--bzzapi", cluster.Nodes[0].URL, "up", f.Name())
_, matches := up.ExpectRegexp(`[a-f\d]{64}`)
up.ExpectExit()
hash := matches[0]

var info swarm.Info
if err := cluster.Nodes[0].Client.Call(&info, "bzz_info"); err != nil {
t.Fatal(err)
}

cluster.Stop()
defer cluster.Cleanup()

// generate an export.tar
exportCmd := runSwarm(t, "db", "export", info.Path+"/chunks", info.Path+"/export.tar", strings.TrimPrefix(info.BzzKey, "0x"))
exportCmd.ExpectExit()

// start second cluster
cluster2 := newTestCluster(t, 1)

var info2 swarm.Info
if err := cluster2.Nodes[0].Client.Call(&info2, "bzz_info"); err != nil {
t.Fatal(err)
}

// stop second cluster, so that we close LevelDB
cluster2.Stop()
defer cluster2.Cleanup()

// import the export.tar
importCmd := runSwarm(t, "db", "import", info2.Path+"/chunks", info.Path+"/export.tar", strings.TrimPrefix(info2.BzzKey, "0x"))
importCmd.ExpectExit()

// spin second cluster back up
cluster2.StartExistingNodes(t, 1, strings.TrimPrefix(info2.BzzAccount, "0x"))

// try to fetch imported file
res, err := http.Get(cluster2.Nodes[0].URL + "/bzz:/" + hash)
if err != nil {
t.Fatal(err)
}

if res.StatusCode != 200 {
t.Fatalf("expected HTTP status %d, got %s", 200, res.Status)
}

// compare downloaded file with the generated random file
mustEqualFiles(t, f, res.Body)
}

func mustEqualFiles(t *testing.T, up io.Reader, down io.Reader) {
h := md5.New()
upLen, err := io.Copy(h, up)
if err != nil {
t.Fatal(err)
}
upHash := h.Sum(nil)
h.Reset()
downLen, err := io.Copy(h, down)
if err != nil {
t.Fatal(err)
}
downHash := h.Sum(nil)

if !bytes.Equal(upHash, downHash) || upLen != downLen {
t.Fatalf("downloaded imported file md5=%x (length %v) is not the same as the generated one mp5=%x (length %v)", downHash, downLen, upHash, upLen)
}
}

func generateRandomFile(t *testing.T, size int) (f *os.File, teardown func()) {
// create a tmp file
tmp, err := ioutil.TempFile("", "swarm-test")
if err != nil {
t.Fatal(err)
}

// callback for tmp file cleanup
teardown = func() {
tmp.Close()
os.Remove(tmp.Name())
}

// write 10mb random data to file
buf := make([]byte, 10000000)
_, err = rand.Read(buf)
if err != nil {
t.Fatal(err)
}
ioutil.WriteFile(tmp.Name(), buf, 0755)

return tmp, teardown
}
116 changes: 104 additions & 12 deletions cmd/swarm/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type testCluster struct {
//
// When starting more than one node, they are connected together using the
// admin SetPeer RPC method.

func newTestCluster(t *testing.T, size int) *testCluster {
cluster := &testCluster{}
defer func() {
Expand All @@ -96,18 +97,7 @@ func newTestCluster(t *testing.T, size int) *testCluster {
cluster.TmpDir = tmpdir

// start the nodes
cluster.Nodes = make([]*testNode, 0, size)
for i := 0; i < size; i++ {
dir := filepath.Join(cluster.TmpDir, fmt.Sprintf("swarm%02d", i))
if err := os.Mkdir(dir, 0700); err != nil {
t.Fatal(err)
}

node := newTestNode(t, dir)
node.Name = fmt.Sprintf("swarm%02d", i)

cluster.Nodes = append(cluster.Nodes, node)
}
cluster.StartNewNodes(t, size)

if size == 1 {
return cluster
Expand Down Expand Up @@ -145,6 +135,42 @@ func (c *testCluster) Shutdown() {
os.RemoveAll(c.TmpDir)
}

func (c *testCluster) Stop() {
for _, node := range c.Nodes {
node.Shutdown()
}
}

func (c *testCluster) StartNewNodes(t *testing.T, size int) {
c.Nodes = make([]*testNode, 0, size)
for i := 0; i < size; i++ {
dir := filepath.Join(c.TmpDir, fmt.Sprintf("swarm%02d", i))
if err := os.Mkdir(dir, 0700); err != nil {
t.Fatal(err)
}

node := newTestNode(t, dir)
node.Name = fmt.Sprintf("swarm%02d", i)

c.Nodes = append(c.Nodes, node)
}
}

func (c *testCluster) StartExistingNodes(t *testing.T, size int, bzzaccount string) {
c.Nodes = make([]*testNode, 0, size)
for i := 0; i < size; i++ {
dir := filepath.Join(c.TmpDir, fmt.Sprintf("swarm%02d", i))
node := existingTestNode(t, dir, bzzaccount)
node.Name = fmt.Sprintf("swarm%02d", i)

c.Nodes = append(c.Nodes, node)
}
}

func (c *testCluster) Cleanup() {
os.RemoveAll(c.TmpDir)
}

type testNode struct {
Name string
Addr string
Expand Down Expand Up @@ -181,6 +207,72 @@ func getTestAccount(t *testing.T, dir string) (conf *node.Config, account accoun
return conf, account
}

func existingTestNode(t *testing.T, dir string, bzzaccount string) *testNode {
conf, _ := getTestAccount(t, dir)
node := &testNode{Dir: dir}

// use a unique IPCPath when running tests on Windows
if runtime.GOOS == "windows" {
conf.IPCPath = fmt.Sprintf("bzzd-%s.ipc", bzzaccount)
}

// assign ports
httpPort, err := assignTCPPort()
if err != nil {
t.Fatal(err)
}
p2pPort, err := assignTCPPort()
if err != nil {
t.Fatal(err)
}

// start the node
node.Cmd = runSwarm(t,
"--port", p2pPort,
"--nodiscover",
"--datadir", dir,
"--ipcpath", conf.IPCPath,
"--ens-api", "",
"--bzzaccount", bzzaccount,
"--bzznetworkid", "321",
"--bzzport", httpPort,
"--verbosity", "6",
)
node.Cmd.InputLine(testPassphrase)
defer func() {
if t.Failed() {
node.Shutdown()
}
}()

// wait for the node to start
for start := time.Now(); time.Since(start) < 10*time.Second; time.Sleep(50 * time.Millisecond) {
node.Client, err = rpc.Dial(conf.IPCEndpoint())
if err == nil {
break
}
}
if node.Client == nil {
t.Fatal(err)
}

// load info
var info swarm.Info
if err := node.Client.Call(&info, "bzz_info"); err != nil {
t.Fatal(err)
}
node.Addr = net.JoinHostPort("127.0.0.1", info.Port)
node.URL = "http://" + node.Addr

var nodeInfo p2p.NodeInfo
if err := node.Client.Call(&nodeInfo, "admin_nodeInfo"); err != nil {
t.Fatal(err)
}
node.Enode = fmt.Sprintf("enode://%s@127.0.0.1:%s", nodeInfo.ID, p2pPort)

return node
}

func newTestNode(t *testing.T, dir string) *testNode {

conf, account := getTestAccount(t, dir)
Expand Down
10 changes: 8 additions & 2 deletions common/bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
// Package common contains various helper functions.
package common

import "encoding/hex"
import (
"encoding/hex"
"fmt"
)

func ToHex(b []byte) string {
hex := Bytes2Hex(b)
Expand Down Expand Up @@ -78,7 +81,10 @@ func Bytes2Hex(d []byte) string {
}

func Hex2Bytes(str string) []byte {
h, _ := hex.DecodeString(str)
h, err := hex.DecodeString(str)
if err != nil {
panic(fmt.Sprintf("cannot hex decode the string %s, got error %v", str, err))
}

return h
}
Expand Down
26 changes: 14 additions & 12 deletions swarm/storage/ldbstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ const (
)

var (
keyIndex = byte(0)
keyOldData = byte(1)
keyAccessCnt = []byte{2}
keyEntryCnt = []byte{3}
Expand Down Expand Up @@ -207,7 +208,7 @@ func (s *LDBStore) updateIndexAccess(index *dpaDBIndex) {
func getIndexKey(hash Key) []byte {
hashSize := len(hash)
key := make([]byte, hashSize+1)
key[0] = 0
key[0] = keyIndex
copy(key[1:], hash[:])
return key
}
Expand Down Expand Up @@ -364,11 +365,13 @@ func (s *LDBStore) Export(out io.Writer) (int64, error) {
}

var index dpaDBIndex
decodeIndex(it.Value(), &index)

hash := key[1:]

data, err := s.db.Get(getDataKey(index.Idx, s.po(hash)))
decodeIndex(it.Value(), &index)
po := s.po(hash)
datakey := getDataKey(index.Idx, po)
log.Trace("store.export", "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po)
data, err := s.db.Get(datakey)
if err != nil {
log.Warn(fmt.Sprintf("Chunk %x found but could not be accessed: %v", key[:], err))
continue
Expand Down Expand Up @@ -421,7 +424,7 @@ func (s *LDBStore) Import(in io.Reader) (int64, error) {
return count, err
}
chunk := NewChunk(key, nil)
chunk.SData = data
chunk.SData = data[32:]
s.Put(chunk)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -559,10 +562,10 @@ func (s *LDBStore) Put(chunk *Chunk) {
s.lock.Lock()
defer s.lock.Unlock()

log.Trace("ldbstore.put: s.db.Get", "key", chunk.Key)
log.Trace("ldbstore.put: s.db.Get", "key", chunk.Key, "ikey", fmt.Sprintf("%x", ikey))
idata, err := s.db.Get(ikey)
if err != nil {
s.doPut(chunk, ikey, &index, po)
s.doPut(chunk, &index, po)
batchC := s.batchC
go func() {
<-batchC
Expand All @@ -584,10 +587,10 @@ func (s *LDBStore) Put(chunk *Chunk) {
}

// force putting into db, does not check access index
func (s *LDBStore) doPut(chunk *Chunk, ikey []byte, index *dpaDBIndex, po uint8) {
log.Trace("ldbstore.doPut", "key", chunk.Key)
func (s *LDBStore) doPut(chunk *Chunk, index *dpaDBIndex, po uint8) {
data := s.encodeDataFunc(chunk)
s.batch.Put(getDataKey(s.dataIdx, po), data)
dkey := getDataKey(s.dataIdx, po)
s.batch.Put(dkey, data)
index.Idx = s.dataIdx
s.bucketCnt[po] = s.dataIdx
s.entryCnt++
Expand All @@ -597,7 +600,6 @@ func (s *LDBStore) doPut(chunk *Chunk, ikey []byte, index *dpaDBIndex, po uint8)
cntKey[0] = keyDistanceCnt
cntKey[1] = po
s.batch.Put(cntKey, U64ToBytes(s.bucketCnt[po]))

}

func (s *LDBStore) writeBatches() {
Expand Down Expand Up @@ -690,7 +692,7 @@ func (s *LDBStore) get(key Key) (chunk *Chunk, err error) {
proximity := s.po(key)
datakey := getDataKey(indx.Idx, proximity)
data, err = s.db.Get(datakey)
log.Trace("ldbstore.get retrieve", "key", key, "indexkey", indx.Idx, "datakey", datakey, "proximity", proximity)
log.Trace("ldbstore.get retrieve", "key", key, "indexkey", indx.Idx, "datakey", fmt.Sprintf("%x", datakey), "proximity", proximity)
if err != nil {
log.Trace("ldbstore.get chunk found but could not be accessed", "key", key, "err", err)
s.delete(indx.Idx, getIndexKey(key), s.po(key))
Expand Down
Loading

0 comments on commit d6e157c

Please sign in to comment.