Skip to content

Commit

Permalink
Integrate encryption into dpa and chunker (ethereum#327)
Browse files Browse the repository at this point in the history
* swarm: Integrate encryption into dpa and chunker

This is not a fully functional change.
Chunker api changed: it uses Putter and Getter interfaces to put and get
chunks from the store and to do the encryption/decryption if needed. These
putters and getters should also the hashing, so hashing related code
could be removed from chunker.
Some things are temporary, pyramid chunker doesn't work yet (only tree
chunker), and hashing and putting in dpa is not parallelized.

* swarm/storage: Use TreeChunker in DPA

PyramydChunker doesn't work yet with Putter/Getter

* swarm/storage: HasherStore needs a new hasher instance on every chunk
creation

This fixes the tests with pyramid chunker

* swarm/storage: Add back tree/pyramid comparison in chunker test

* swarm: Refactor chunker, new chunker instance is needed for each job

Chunker code is much simpler if all data of the job is stored on the
chunker object itself.

* swarm/storage: Remove commented code

* swarm/storage: Added comments

* cmd/swarm: Remove Branches from config

* swarm/storage: Remove commented code

* swarm/network: Remove chunker from syncer

* swarm/storage: Remove Chunker/Splitter/Joiner interfaces

They are not used anymore

* swarm/storage: Fix comment

* swarm/storage: Reenable chunker_test.TestDataAppend

* swarm/storage: Fix comments

* swarm/storage, cmd/swarm: Add FakeChunkStore

Instead of using a nil ChunkStore in hasherStore when we don't want the
hasherStore to actually store the chunk data, now a FakeChunkStore
instance should be used. Because of this the nil checks of the hasherStore.store
could be removed.

* swarm/storage: Use DefaultChunkSize az constant, not DefaultBranches

The branches what is changing, and it is dependent on chunk size and
hash size

* swarm/storage: Remove unnecessary return statement

* swarm/storage: Rename unfinishedChunk to unfinishedChunkData

* swarm/storage: Move NewTreeSplitterParams to chunker.go

It was in pyramid.go, although TreeSplitter is in chunker.go

* swarm/storage: Minor cleanup

incrementWorkerCount can be moved into runWorker
remove obsolete comment

* swarm/storage: Add TODO comment to chunker depth parameter

Depth can only be 0 because of a bug: ethersphere/swarm#344

* swarm/storage: Fix padding in hasherStore

There was a bug that data was not lengthened to padding

* swarm/storage: Add unit test for hasherStore

* swarm/storage: Decrease initial counter in span encryption

* swarm/storage: Renames in benchmark tests

* swarm/storage: Fixed pyramid benchmark test

* swarm/storage: Convert TestHasherStore to table-driven test

* swarm/storage, cmd/swarm: New MapChunkStore, simple ChunkStore implementation

It is not just the test which needs this kind of implementation

* swarm/storage: Cleanup on hashSize/chunkSize handling
  • Loading branch information
gbalint authored Mar 30, 2018
1 parent 6d182b8 commit 167159b
Show file tree
Hide file tree
Showing 31 changed files with 1,012 additions and 847 deletions.
10 changes: 0 additions & 10 deletions cmd/swarm/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ func TestConfigFileOverrides(t *testing.T) {
defaultConf.NetworkId = 54
defaultConf.Port = httpPort
defaultConf.StoreParams.DbCapacity = 9000000
defaultConf.ChunkerParams.Branches = 64
defaultConf.HiveParams.KeepAliveInterval = 6000000000
defaultConf.Swap.Params.Strategy.AutoCashInterval = 600 * time.Second
//defaultConf.SyncParams.KeyBufferSize = 512
Expand Down Expand Up @@ -237,10 +236,6 @@ func TestConfigFileOverrides(t *testing.T) {
t.Fatalf("Expected network ID to be %d, got %d", 54, info.NetworkId)
}

if info.ChunkerParams.Branches != 64 {
t.Fatalf("Expected chunker params branches to be %d, got %d", 64, info.ChunkerParams.Branches)
}

if info.HiveParams.KeepAliveInterval != 6000000000 {
t.Fatalf("Expected HiveParams KeepAliveInterval to be %d, got %d", uint64(6000000000), uint64(info.HiveParams.KeepAliveInterval))
}
Expand Down Expand Up @@ -374,7 +369,6 @@ func TestConfigCmdLineOverridesFile(t *testing.T) {
defaultConf.NetworkId = 54
defaultConf.Port = "8588"
defaultConf.StoreParams.DbCapacity = 9000000
defaultConf.ChunkerParams.Branches = 64
defaultConf.HiveParams.KeepAliveInterval = 6000000000
defaultConf.Swap.Params.Strategy.AutoCashInterval = 600 * time.Second
//defaultConf.SyncParams.KeyBufferSize = 512
Expand Down Expand Up @@ -456,10 +450,6 @@ func TestConfigCmdLineOverridesFile(t *testing.T) {
t.Fatalf("Expected network ID to be %d, got %d", 54, info.NetworkId)
}

if info.ChunkerParams.Branches != 64 {
t.Fatalf("Expected chunker params branches to be %d, got %d", 64, info.ChunkerParams.Branches)
}

if info.HiveParams.KeepAliveInterval != 6000000000 {
t.Fatalf("Expected HiveParams KeepAliveInterval to be %d, got %d", uint64(6000000000), uint64(info.HiveParams.KeepAliveInterval))
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/swarm/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ func hash(ctx *cli.Context) {
defer f.Close()

stat, _ := f.Stat()
chunker := storage.NewTreeChunker(storage.NewChunkerParams())
key, _, err := chunker.Split(f, stat.Size(), nil)
dpa := storage.NewDPA(storage.NewMapChunkStore(), storage.NewDPAParams())
key, _, err := dpa.Store(f, stat.Size(), false)
if err != nil {
utils.Fatalf("%v\n", err)
} else {
Expand Down
6 changes: 3 additions & 3 deletions swarm/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func (self *Api) Retrieve(key storage.Key) storage.LazySectionReader {

func (self *Api) Store(data io.Reader, size int64) (key storage.Key, wait func(), err error) {
log.Debug("api.store", "size", size)
return self.dpa.Store(data, size)
return self.dpa.Store(data, size, false)
}

type ErrResolve error
Expand Down Expand Up @@ -286,14 +286,14 @@ func (self *Api) Resolve(uri *URI) (storage.Key, error) {
func (self *Api) Put(content, contentType string) (k storage.Key, wait func(), err error) {
apiPutCount.Inc(1)
r := strings.NewReader(content)
key, waitContent, err := self.dpa.Store(r, int64(len(content)))
key, waitContent, err := self.dpa.Store(r, int64(len(content)), false)
if err != nil {
apiPutFail.Inc(1)
return nil, nil, err
}
manifest := fmt.Sprintf(`{"entries":[{"hash":"%v","contentType":"%s"}]}`, key, contentType)
r = strings.NewReader(manifest)
key, waitManifest, err := self.dpa.Store(r, int64(len(manifest)))
key, waitManifest, err := self.dpa.Store(r, int64(len(manifest)), false)
if err != nil {
apiPutFail.Inc(1)
return nil, nil, err
Expand Down
2 changes: 0 additions & 2 deletions swarm/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@ func testApi(t *testing.T, f func(*Api)) {
return
}
api := NewApi(dpa, nil, nil)
dpa.Start()
f(api)
dpa.Stop()
}

type testResponse struct {
Expand Down
8 changes: 4 additions & 4 deletions swarm/api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ const (
type Config struct {
// serialised/persisted fields
*storage.StoreParams
*storage.ChunkerParams
*storage.DPAParams
*network.HiveParams
Swap *swap.SwapParams
//*network.SyncParams
Expand Down Expand Up @@ -72,9 +72,9 @@ type Config struct {
func NewConfig() (self *Config) {

self = &Config{
StoreParams: storage.NewDefaultStoreParams(),
ChunkerParams: storage.NewChunkerParams(),
HiveParams: network.NewHiveParams(),
StoreParams: storage.NewDefaultStoreParams(),
DPAParams: storage.NewDPAParams(),
HiveParams: network.NewHiveParams(),
//SyncParams: network.NewDefaultSyncParams(),
Swap: swap.NewDefaultSwapParams(),
ListenAddr: DefaultHTTPListenAddr,
Expand Down
2 changes: 1 addition & 1 deletion swarm/api/filesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (self *FileSystem) Upload(lpath, index string) (string, error) {
stat, _ := f.Stat()
var hash storage.Key
var wait func()
hash, wait, err = self.api.dpa.Store(f, stat.Size())
hash, wait, err = self.api.dpa.Store(f, stat.Size(), false)
if hash != nil {
list[i].Hash = hash.Hex()
}
Expand Down
2 changes: 1 addition & 1 deletion swarm/api/http/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func TestBzzGetPath(t *testing.T) {
for i, mf := range testmanifest {
reader[i] = bytes.NewReader([]byte(mf))
var wait func()
key[i], wait, err = srv.Dpa.Store(reader[i], int64(len(mf)))
key[i], wait, err = srv.Dpa.Store(reader[i], int64(len(mf)), false)
if err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion swarm/api/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ func (self *manifestTrie) recalcAndStore() error {
}

sr := bytes.NewReader(manifest)
key, wait, err2 := self.dpa.Store(sr, int64(len(manifest)))
key, wait, err2 := self.dpa.Store(sr, int64(len(manifest)), false)
wait()
self.hash = key
return err2
Expand Down
2 changes: 0 additions & 2 deletions swarm/fuse/swarmfs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -813,8 +813,6 @@ func TestFUSE(t *testing.T) {
t.Fatal(err)
}
ta := &testAPI{api: api.NewApi(dpa, nil, nil)}
dpa.Start()
defer dpa.Stop()

t.Run("mountListAndUmount", ta.mountListAndUnmount)
t.Run("maxMounts", ta.maxMounts)
Expand Down
4 changes: 1 addition & 3 deletions swarm/network/stream/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func NewStreamerService(ctx *adapters.ServiceContext) (node.Service, error) {
go func() {
waitPeerErrC <- waitForPeers(r, 1*time.Second, peerCount(id))
}()
dpa := storage.NewDPA(storage.NewNetStore(store, nil), storage.NewChunkerParams())
dpa := storage.NewDPA(storage.NewNetStore(store, nil), storage.NewDPAParams())
return &TestRegistry{Registry: r, dpa: dpa}, nil
}

Expand Down Expand Up @@ -207,12 +207,10 @@ func (r *TestRegistry) ReadAll(hash common.Hash) (int64, error) {
}

func (r *TestRegistry) Start(server *p2p.Server) error {
r.dpa.Start()
return r.Registry.Start(server)
}

func (r *TestRegistry) Stop() error {
r.dpa.Stop()
return r.Registry.Stop()
}

Expand Down
16 changes: 5 additions & 11 deletions swarm/network/stream/delivery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,13 +341,11 @@ func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
}

// here we distribute chunks of a random file into Stores of nodes 1 to nodes
rrdpa := storage.NewDPA(newRoundRobinStore(sim.Stores[1:]...), storage.NewChunkerParams())
rrdpa.Start()
rrdpa := storage.NewDPA(newRoundRobinStore(sim.Stores[1:]...), storage.NewDPAParams())
size := chunkCount * chunkSize
fileHash, wait, err := rrdpa.Store(io.LimitReader(crand.Reader, int64(size)), int64(size))
fileHash, wait, err := rrdpa.Store(io.LimitReader(crand.Reader, int64(size)), int64(size), false)
// wait until all chunks stored
wait()
defer rrdpa.Stop()
if err != nil {
t.Fatal(err.Error())
}
Expand Down Expand Up @@ -395,11 +393,9 @@ func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
return delivery.RequestFromPeers(chunk.Key[:], skipCheck)
}
netStore := storage.NewNetStore(sim.Stores[0].(*storage.LocalStore), retrieveFunc)
dpa := storage.NewDPA(netStore, storage.NewChunkerParams())
dpa.Start()
dpa := storage.NewDPA(netStore, storage.NewDPAParams())

go func() {
defer dpa.Stop()
// start the retrieval on the pivot node - this will spawn retrieve requests for missing chunks
// we must wait for the peer connections to have started before requesting
n, err := readAll(dpa, fileHash)
Expand Down Expand Up @@ -518,9 +514,7 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, conns, chunkCount int, skip
waitPeerErrC = make(chan error)

// create a dpa for the last node in the chain which we are gonna write to
remoteDpa := storage.NewDPA(sim.Stores[nodes-1], storage.NewChunkerParams())
remoteDpa.Start()
defer remoteDpa.Stop()
remoteDpa := storage.NewDPA(sim.Stores[nodes-1], storage.NewDPAParams())

// channel to signal simulation initialisation with action call complete
// or node disconnections
Expand Down Expand Up @@ -614,7 +608,7 @@ Loop:
hashes := make([]storage.Key, chunkCount)
for i := 0; i < chunkCount; i++ {
// create actual size real chunks
hash, wait, err := remoteDpa.Store(io.LimitReader(crand.Reader, int64(chunkSize)), int64(chunkSize))
hash, wait, err := remoteDpa.Store(io.LimitReader(crand.Reader, int64(chunkSize)), int64(chunkSize), false)
// wait until all chunks stored
wait()
if err != nil {
Expand Down
6 changes: 2 additions & 4 deletions swarm/network/stream/intervals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,10 @@ func testIntervals(t *testing.T, live bool, history *Range) {
return 1
}

dpa := storage.NewDPA(sim.Stores[0], storage.NewChunkerParams())
dpa.Start()
dpa := storage.NewDPA(sim.Stores[0], storage.NewDPAParams())
size := chunkCount * chunkSize
_, wait, err := dpa.Store(io.LimitReader(crand.Reader, int64(size)), int64(size))
_, wait, err := dpa.Store(io.LimitReader(crand.Reader, int64(size)), int64(size), false)
wait()
defer dpa.Stop()
if err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion swarm/network/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func NewRegistry(addr *network.BzzAddr, delivery *Delivery, db *storage.DBAPI, i
return NewSwarmChunkServer(delivery.db), nil
})
streamer.RegisterClientFunc(swarmChunkServerStreamName, func(p *Peer, _ string, _ bool) (Client, error) {
return NewSwarmSyncerClient(p, delivery.db, nil)
return NewSwarmSyncerClient(p, delivery.db)
})
RegisterSwarmSyncerServer(streamer, db)
RegisterSwarmSyncerClient(streamer, db)
Expand Down
73 changes: 35 additions & 38 deletions swarm/network/stream/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@
package stream

import (
"bytes"
"errors"
"fmt"
"io"
"math"
"strconv"
"time"
Expand Down Expand Up @@ -138,17 +134,16 @@ type SwarmSyncerClient struct {
retrieveC chan *storage.Chunk
storeC chan *storage.Chunk
db *storage.DBAPI
chunker storage.Chunker
currentRoot storage.Key
requestFunc func(chunk *storage.Chunk)
end, start uint64
// chunker storage.Chunker
currentRoot storage.Key
requestFunc func(chunk *storage.Chunk)
end, start uint64
}

// NewSwarmSyncerClient is a contructor for provable data exchange syncer
func NewSwarmSyncerClient(_ *Peer, db *storage.DBAPI, chunker storage.Chunker) (*SwarmSyncerClient, error) {
func NewSwarmSyncerClient(_ *Peer, db *storage.DBAPI) (*SwarmSyncerClient, error) {
return &SwarmSyncerClient{
db: db,
chunker: chunker,
db: db,
}, nil
}

Expand Down Expand Up @@ -192,7 +187,7 @@ func NewSwarmSyncerClient(_ *Peer, db *storage.DBAPI, chunker storage.Chunker) (
// to handle incoming sync streams
func RegisterSwarmSyncerClient(streamer *Registry, db *storage.DBAPI) {
streamer.RegisterClientFunc("SYNC", func(p *Peer, _ string, love bool) (Client, error) {
return NewSwarmSyncerClient(p, db, nil)
return NewSwarmSyncerClient(p, db)
})
}

Expand All @@ -211,37 +206,39 @@ func (s *SwarmSyncerClient) NeedData(key []byte) (wait func()) {

// BatchDone
func (s *SwarmSyncerClient) BatchDone(stream Stream, from uint64, hashes []byte, root []byte) func() (*TakeoverProof, error) {
if s.chunker != nil {
return func() (*TakeoverProof, error) { return s.TakeoverProof(stream, from, hashes, root) }
}
// TODO: reenable this with putter/getter refactored code
// if s.chunker != nil {
// return func() (*TakeoverProof, error) { return s.TakeoverProof(stream, from, hashes, root) }
// }
return nil
}

func (s *SwarmSyncerClient) TakeoverProof(stream Stream, from uint64, hashes []byte, root storage.Key) (*TakeoverProof, error) {
// for provable syncer currentRoot is non-zero length
if s.chunker != nil {
if from > s.sessionAt { // for live syncing currentRoot is always updated
//expRoot, err := s.chunker.Append(s.currentRoot, bytes.NewReader(hashes), s.retrieveC, s.storeC)
expRoot, _, err := s.chunker.Append(s.currentRoot, bytes.NewReader(hashes), s.retrieveC)
if err != nil {
return nil, err
}
if !bytes.Equal(root, expRoot) {
return nil, fmt.Errorf("HandoverProof mismatch")
}
s.currentRoot = root
} else {
expHashes := make([]byte, len(hashes))
_, err := s.sessionReader.ReadAt(expHashes, int64(s.end*HashSize))
if err != nil && err != io.EOF {
return nil, err
}
if !bytes.Equal(expHashes, hashes) {
return nil, errors.New("invalid proof")
}
}
return nil, nil
}
// TODO: reenable this with putter/getter
// if s.chunker != nil {
// if from > s.sessionAt { // for live syncing currentRoot is always updated
// //expRoot, err := s.chunker.Append(s.currentRoot, bytes.NewReader(hashes), s.retrieveC, s.storeC)
// expRoot, _, err := s.chunker.Append(s.currentRoot, bytes.NewReader(hashes), s.retrieveC)
// if err != nil {
// return nil, err
// }
// if !bytes.Equal(root, expRoot) {
// return nil, fmt.Errorf("HandoverProof mismatch")
// }
// s.currentRoot = root
// } else {
// expHashes := make([]byte, len(hashes))
// _, err := s.sessionReader.ReadAt(expHashes, int64(s.end*HashSize))
// if err != nil && err != io.EOF {
// return nil, err
// }
// if !bytes.Equal(expHashes, hashes) {
// return nil, errors.New("invalid proof")
// }
// }
// return nil, nil
// }
s.end += uint64(len(hashes)) / HashSize
takeover := &Takeover{
Stream: stream,
Expand Down
6 changes: 2 additions & 4 deletions swarm/network/stream/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,11 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
waitPeerErrC = make(chan error)

// here we distribute chunks of a random file into stores 1...nodes
rrdpa := storage.NewDPA(newRoundRobinStore(sim.Stores[1:]...), storage.NewChunkerParams())
rrdpa.Start()
rrdpa := storage.NewDPA(newRoundRobinStore(sim.Stores[1:]...), storage.NewDPAParams())
size := chunkCount * chunkSize
_, wait, err := rrdpa.Store(io.LimitReader(crand.Reader, int64(size)), int64(size))
_, wait, err := rrdpa.Store(io.LimitReader(crand.Reader, int64(size)), int64(size), false)
// need to wait cos we then immediately collect the relevant bin content
wait()
defer rrdpa.Stop()
if err != nil {
t.Fatal(err.Error())
}
Expand Down
2 changes: 1 addition & 1 deletion swarm/pss/pss.go
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,7 @@ func (self *Pss) checkFwdCache(addr []byte, digest pssDigest) bool {
// DPA storage handler for message cache
func (self *Pss) storeMsg(msg *PssMsg) (pssDigest, error) {
buf := bytes.NewReader(msg.serialize())
key, _, err := self.dpa.Store(buf, int64(buf.Len()))
key, _, err := self.dpa.Store(buf, int64(buf.Len()), false)
if err != nil {
log.Warn("Could not store in swarm", "err", err)
return pssDigest{}, err
Expand Down
Loading

0 comments on commit 167159b

Please sign in to comment.