Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

Integrate encryption into dpa and chunker #327

Merged
merged 30 commits into from
Mar 30, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
9d1574c
swarm: Integrate encryption into dpa and chunker
gbalint Mar 14, 2018
c9e5f37
swarm/storage: Use TreeChunker in DPA
gbalint Mar 14, 2018
5e70f70
swarm/storage: HasherStore needs a new hasher instance on every chunk
gbalint Mar 14, 2018
1827520
swarm/storage: Add back tree/pyramid comparison in chunker test
gbalint Mar 14, 2018
f32d4cc
swarm: Refactor chunker, new chunker instance is needed for each job
gbalint Mar 26, 2018
db1b51a
swarm/storage: Remove commented code
gbalint Mar 26, 2018
8016f18
swarm/storage: Added comments
gbalint Mar 26, 2018
8a6cf6f
cmd/swarm: Remove Branches from config
gbalint Mar 26, 2018
132b3fc
swarm/storage: Remove commented code
gbalint Mar 26, 2018
fca1f30
swarm/network: Remove chunker from syncer
gbalint Mar 26, 2018
424b11d
swarm/storage: Remove Chunker/Splitter/Joiner interfaces
gbalint Mar 26, 2018
2329cfb
swarm/storage: Fix comment
gbalint Mar 26, 2018
d8ee6a1
swarm/storage: Reenable chunker_test.TestDataAppend
gbalint Mar 27, 2018
46e9832
swarm/storage: Fix comments
gbalint Mar 27, 2018
c03ae51
swarm/storage, cmd/swarm: Add FakeChunkStore
gbalint Mar 27, 2018
68cf01e
swarm/storage: Use DefaultChunkSize az constant, not DefaultBranches
gbalint Mar 28, 2018
49a4d9c
swarm/storage: Remove unnecessary return statement
gbalint Mar 28, 2018
3358a59
swarm/storage: Rename unfinishedChunk to unfinishedChunkData
gbalint Mar 28, 2018
484e22c
swarm/storage: Move NewTreeSplitterParams to chunker.go
gbalint Mar 28, 2018
b0ab49f
swarm/storage: Minor cleanup
gbalint Mar 28, 2018
582c6e5
swarm/storage: Add TODO comment to chunker depth parameter
gbalint Mar 28, 2018
534217f
swarm/storage: Fix padding in hasherStore
gbalint Mar 29, 2018
cdc7bed
swarm/storage: Add unit test for hasherStore
gbalint Mar 29, 2018
b737d49
swarm/storage: Decrease initial counter in span encryption
gbalint Mar 29, 2018
c59a6a4
swarm/storage: Renames in benchmark tests
gbalint Mar 29, 2018
bc32751
swarm/storage: Fixed pyramid benchmark test
gbalint Mar 29, 2018
ba258d7
swarm/storage: Convert TestHasherStore to table-driven test
gbalint Mar 29, 2018
6c5a075
swarm/storage, cmd/swarm: New MapChunkStore, simple ChunkStore implem…
gbalint Mar 29, 2018
0bfe320
swarm/storage: Cleanup on hashSize/chunkSize handling
gbalint Mar 29, 2018
e187fd0
Merge branch 'swarm-network-rewrite' into swarm-network-rewrite-encry…
gbalint Mar 30, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions cmd/swarm/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ func TestConfigFileOverrides(t *testing.T) {
defaultConf.NetworkId = 54
defaultConf.Port = httpPort
defaultConf.StoreParams.DbCapacity = 9000000
defaultConf.ChunkerParams.Branches = 64
defaultConf.HiveParams.KeepAliveInterval = 6000000000
defaultConf.Swap.Params.Strategy.AutoCashInterval = 600 * time.Second
//defaultConf.SyncParams.KeyBufferSize = 512
Expand Down Expand Up @@ -237,10 +236,6 @@ func TestConfigFileOverrides(t *testing.T) {
t.Fatalf("Expected network ID to be %d, got %d", 54, info.NetworkId)
}

if info.ChunkerParams.Branches != 64 {
t.Fatalf("Expected chunker params branches to be %d, got %d", 64, info.ChunkerParams.Branches)
}

if info.HiveParams.KeepAliveInterval != 6000000000 {
t.Fatalf("Expected HiveParams KeepAliveInterval to be %d, got %d", uint64(6000000000), uint64(info.HiveParams.KeepAliveInterval))
}
Expand Down Expand Up @@ -374,7 +369,6 @@ func TestConfigCmdLineOverridesFile(t *testing.T) {
defaultConf.NetworkId = 54
defaultConf.Port = "8588"
defaultConf.StoreParams.DbCapacity = 9000000
defaultConf.ChunkerParams.Branches = 64
defaultConf.HiveParams.KeepAliveInterval = 6000000000
defaultConf.Swap.Params.Strategy.AutoCashInterval = 600 * time.Second
//defaultConf.SyncParams.KeyBufferSize = 512
Expand Down Expand Up @@ -456,10 +450,6 @@ func TestConfigCmdLineOverridesFile(t *testing.T) {
t.Fatalf("Expected network ID to be %d, got %d", 54, info.NetworkId)
}

if info.ChunkerParams.Branches != 64 {
t.Fatalf("Expected chunker params branches to be %d, got %d", 64, info.ChunkerParams.Branches)
}

if info.HiveParams.KeepAliveInterval != 6000000000 {
t.Fatalf("Expected HiveParams KeepAliveInterval to be %d, got %d", uint64(6000000000), uint64(info.HiveParams.KeepAliveInterval))
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/swarm/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ func hash(ctx *cli.Context) {
defer f.Close()

stat, _ := f.Stat()
chunker := storage.NewTreeChunker(storage.NewChunkerParams())
key, _, err := chunker.Split(f, stat.Size(), nil)
dpa := storage.NewDPA(storage.NewMapChunkStore(), storage.NewDPAParams())
key, _, err := dpa.Store(f, stat.Size(), false)
if err != nil {
utils.Fatalf("%v\n", err)
} else {
Expand Down
6 changes: 3 additions & 3 deletions swarm/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func (self *Api) Retrieve(key storage.Key) storage.LazySectionReader {

func (self *Api) Store(data io.Reader, size int64) (key storage.Key, wait func(), err error) {
log.Debug("api.store", "size", size)
return self.dpa.Store(data, size)
return self.dpa.Store(data, size, false)
}

type ErrResolve error
Expand Down Expand Up @@ -286,14 +286,14 @@ func (self *Api) Resolve(uri *URI) (storage.Key, error) {
func (self *Api) Put(content, contentType string) (k storage.Key, wait func(), err error) {
apiPutCount.Inc(1)
r := strings.NewReader(content)
key, waitContent, err := self.dpa.Store(r, int64(len(content)))
key, waitContent, err := self.dpa.Store(r, int64(len(content)), false)
if err != nil {
apiPutFail.Inc(1)
return nil, nil, err
}
manifest := fmt.Sprintf(`{"entries":[{"hash":"%v","contentType":"%s"}]}`, key, contentType)
r = strings.NewReader(manifest)
key, waitManifest, err := self.dpa.Store(r, int64(len(manifest)))
key, waitManifest, err := self.dpa.Store(r, int64(len(manifest)), false)
if err != nil {
apiPutFail.Inc(1)
return nil, nil, err
Expand Down
2 changes: 0 additions & 2 deletions swarm/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@ func testApi(t *testing.T, f func(*Api)) {
return
}
api := NewApi(dpa, nil, nil)
dpa.Start()
f(api)
dpa.Stop()
}

type testResponse struct {
Expand Down
8 changes: 4 additions & 4 deletions swarm/api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ const (
type Config struct {
// serialised/persisted fields
*storage.StoreParams
*storage.ChunkerParams
*storage.DPAParams
*network.HiveParams
Swap *swap.SwapParams
//*network.SyncParams
Expand Down Expand Up @@ -72,9 +72,9 @@ type Config struct {
func NewConfig() (self *Config) {

self = &Config{
StoreParams: storage.NewDefaultStoreParams(),
ChunkerParams: storage.NewChunkerParams(),
HiveParams: network.NewHiveParams(),
StoreParams: storage.NewDefaultStoreParams(),
DPAParams: storage.NewDPAParams(),
HiveParams: network.NewHiveParams(),
//SyncParams: network.NewDefaultSyncParams(),
Swap: swap.NewDefaultSwapParams(),
ListenAddr: DefaultHTTPListenAddr,
Expand Down
2 changes: 1 addition & 1 deletion swarm/api/filesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (self *FileSystem) Upload(lpath, index string) (string, error) {
stat, _ := f.Stat()
var hash storage.Key
var wait func()
hash, wait, err = self.api.dpa.Store(f, stat.Size())
hash, wait, err = self.api.dpa.Store(f, stat.Size(), false)
if hash != nil {
list[i].Hash = hash.Hex()
}
Expand Down
2 changes: 1 addition & 1 deletion swarm/api/http/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func TestBzzGetPath(t *testing.T) {
for i, mf := range testmanifest {
reader[i] = bytes.NewReader([]byte(mf))
var wait func()
key[i], wait, err = srv.Dpa.Store(reader[i], int64(len(mf)))
key[i], wait, err = srv.Dpa.Store(reader[i], int64(len(mf)), false)
if err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion swarm/api/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ func (self *manifestTrie) recalcAndStore() error {
}

sr := bytes.NewReader(manifest)
key, wait, err2 := self.dpa.Store(sr, int64(len(manifest)))
key, wait, err2 := self.dpa.Store(sr, int64(len(manifest)), false)
wait()
self.hash = key
return err2
Expand Down
2 changes: 0 additions & 2 deletions swarm/fuse/swarmfs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -813,8 +813,6 @@ func TestFUSE(t *testing.T) {
t.Fatal(err)
}
ta := &testAPI{api: api.NewApi(dpa, nil, nil)}
dpa.Start()
defer dpa.Stop()

t.Run("mountListAndUmount", ta.mountListAndUnmount)
t.Run("maxMounts", ta.maxMounts)
Expand Down
4 changes: 1 addition & 3 deletions swarm/network/stream/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func NewStreamerService(ctx *adapters.ServiceContext) (node.Service, error) {
go func() {
waitPeerErrC <- waitForPeers(r, 1*time.Second, peerCount(id))
}()
dpa := storage.NewDPA(storage.NewNetStore(store, nil), storage.NewChunkerParams())
dpa := storage.NewDPA(storage.NewNetStore(store, nil), storage.NewDPAParams())
return &TestRegistry{Registry: r, dpa: dpa}, nil
}

Expand Down Expand Up @@ -207,12 +207,10 @@ func (r *TestRegistry) ReadAll(hash common.Hash) (int64, error) {
}

func (r *TestRegistry) Start(server *p2p.Server) error {
r.dpa.Start()
return r.Registry.Start(server)
}

func (r *TestRegistry) Stop() error {
r.dpa.Stop()
return r.Registry.Stop()
}

Expand Down
16 changes: 5 additions & 11 deletions swarm/network/stream/delivery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,13 +341,11 @@ func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
}

// here we distribute chunks of a random file into Stores of nodes 1 to nodes
rrdpa := storage.NewDPA(newRoundRobinStore(sim.Stores[1:]...), storage.NewChunkerParams())
rrdpa.Start()
rrdpa := storage.NewDPA(newRoundRobinStore(sim.Stores[1:]...), storage.NewDPAParams())
size := chunkCount * chunkSize
fileHash, wait, err := rrdpa.Store(io.LimitReader(crand.Reader, int64(size)), int64(size))
fileHash, wait, err := rrdpa.Store(io.LimitReader(crand.Reader, int64(size)), int64(size), false)
// wait until all chunks stored
wait()
defer rrdpa.Stop()
if err != nil {
t.Fatal(err.Error())
}
Expand Down Expand Up @@ -395,11 +393,9 @@ func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
return delivery.RequestFromPeers(chunk.Key[:], skipCheck)
}
netStore := storage.NewNetStore(sim.Stores[0].(*storage.LocalStore), retrieveFunc)
dpa := storage.NewDPA(netStore, storage.NewChunkerParams())
dpa.Start()
dpa := storage.NewDPA(netStore, storage.NewDPAParams())

go func() {
defer dpa.Stop()
// start the retrieval on the pivot node - this will spawn retrieve requests for missing chunks
// we must wait for the peer connections to have started before requesting
n, err := readAll(dpa, fileHash)
Expand Down Expand Up @@ -518,9 +514,7 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, conns, chunkCount int, skip
waitPeerErrC = make(chan error)

// create a dpa for the last node in the chain which we are gonna write to
remoteDpa := storage.NewDPA(sim.Stores[nodes-1], storage.NewChunkerParams())
remoteDpa.Start()
defer remoteDpa.Stop()
remoteDpa := storage.NewDPA(sim.Stores[nodes-1], storage.NewDPAParams())

// channel to signal simulation initialisation with action call complete
// or node disconnections
Expand Down Expand Up @@ -614,7 +608,7 @@ Loop:
hashes := make([]storage.Key, chunkCount)
for i := 0; i < chunkCount; i++ {
// create actual size real chunks
hash, wait, err := remoteDpa.Store(io.LimitReader(crand.Reader, int64(chunkSize)), int64(chunkSize))
hash, wait, err := remoteDpa.Store(io.LimitReader(crand.Reader, int64(chunkSize)), int64(chunkSize), false)
// wait until all chunks stored
wait()
if err != nil {
Expand Down
6 changes: 2 additions & 4 deletions swarm/network/stream/intervals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,10 @@ func testIntervals(t *testing.T, live bool, history *Range) {
return 1
}

dpa := storage.NewDPA(sim.Stores[0], storage.NewChunkerParams())
dpa.Start()
dpa := storage.NewDPA(sim.Stores[0], storage.NewDPAParams())
size := chunkCount * chunkSize
_, wait, err := dpa.Store(io.LimitReader(crand.Reader, int64(size)), int64(size))
_, wait, err := dpa.Store(io.LimitReader(crand.Reader, int64(size)), int64(size), false)
wait()
defer dpa.Stop()
if err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion swarm/network/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func NewRegistry(addr *network.BzzAddr, delivery *Delivery, db *storage.DBAPI, i
return NewSwarmChunkServer(delivery.db), nil
})
streamer.RegisterClientFunc(swarmChunkServerStreamName, func(p *Peer, _ string, _ bool) (Client, error) {
return NewSwarmSyncerClient(p, delivery.db, nil)
return NewSwarmSyncerClient(p, delivery.db)
})
RegisterSwarmSyncerServer(streamer, db)
RegisterSwarmSyncerClient(streamer, db)
Expand Down
73 changes: 35 additions & 38 deletions swarm/network/stream/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@
package stream

import (
"bytes"
"errors"
"fmt"
"io"
"math"
"strconv"
"time"
Expand Down Expand Up @@ -138,17 +134,16 @@ type SwarmSyncerClient struct {
retrieveC chan *storage.Chunk
storeC chan *storage.Chunk
db *storage.DBAPI
chunker storage.Chunker
currentRoot storage.Key
requestFunc func(chunk *storage.Chunk)
end, start uint64
// chunker storage.Chunker
currentRoot storage.Key
requestFunc func(chunk *storage.Chunk)
end, start uint64
}

// NewSwarmSyncerClient is a contructor for provable data exchange syncer
func NewSwarmSyncerClient(_ *Peer, db *storage.DBAPI, chunker storage.Chunker) (*SwarmSyncerClient, error) {
func NewSwarmSyncerClient(_ *Peer, db *storage.DBAPI) (*SwarmSyncerClient, error) {
return &SwarmSyncerClient{
db: db,
chunker: chunker,
db: db,
}, nil
}

Expand Down Expand Up @@ -192,7 +187,7 @@ func NewSwarmSyncerClient(_ *Peer, db *storage.DBAPI, chunker storage.Chunker) (
// to handle incoming sync streams
func RegisterSwarmSyncerClient(streamer *Registry, db *storage.DBAPI) {
streamer.RegisterClientFunc("SYNC", func(p *Peer, _ string, love bool) (Client, error) {
return NewSwarmSyncerClient(p, db, nil)
return NewSwarmSyncerClient(p, db)
})
}

Expand All @@ -211,37 +206,39 @@ func (s *SwarmSyncerClient) NeedData(key []byte) (wait func()) {

// BatchDone
func (s *SwarmSyncerClient) BatchDone(stream Stream, from uint64, hashes []byte, root []byte) func() (*TakeoverProof, error) {
if s.chunker != nil {
return func() (*TakeoverProof, error) { return s.TakeoverProof(stream, from, hashes, root) }
}
// TODO: reenable this with putter/getter refactored code
// if s.chunker != nil {
// return func() (*TakeoverProof, error) { return s.TakeoverProof(stream, from, hashes, root) }
// }
return nil
}

func (s *SwarmSyncerClient) TakeoverProof(stream Stream, from uint64, hashes []byte, root storage.Key) (*TakeoverProof, error) {
// for provable syncer currentRoot is non-zero length
if s.chunker != nil {
if from > s.sessionAt { // for live syncing currentRoot is always updated
//expRoot, err := s.chunker.Append(s.currentRoot, bytes.NewReader(hashes), s.retrieveC, s.storeC)
expRoot, _, err := s.chunker.Append(s.currentRoot, bytes.NewReader(hashes), s.retrieveC)
if err != nil {
return nil, err
}
if !bytes.Equal(root, expRoot) {
return nil, fmt.Errorf("HandoverProof mismatch")
}
s.currentRoot = root
} else {
expHashes := make([]byte, len(hashes))
_, err := s.sessionReader.ReadAt(expHashes, int64(s.end*HashSize))
if err != nil && err != io.EOF {
return nil, err
}
if !bytes.Equal(expHashes, hashes) {
return nil, errors.New("invalid proof")
}
}
return nil, nil
}
// TODO: reenable this with putter/getter
// if s.chunker != nil {
// if from > s.sessionAt { // for live syncing currentRoot is always updated
// //expRoot, err := s.chunker.Append(s.currentRoot, bytes.NewReader(hashes), s.retrieveC, s.storeC)
// expRoot, _, err := s.chunker.Append(s.currentRoot, bytes.NewReader(hashes), s.retrieveC)
// if err != nil {
// return nil, err
// }
// if !bytes.Equal(root, expRoot) {
// return nil, fmt.Errorf("HandoverProof mismatch")
// }
// s.currentRoot = root
// } else {
// expHashes := make([]byte, len(hashes))
// _, err := s.sessionReader.ReadAt(expHashes, int64(s.end*HashSize))
// if err != nil && err != io.EOF {
// return nil, err
// }
// if !bytes.Equal(expHashes, hashes) {
// return nil, errors.New("invalid proof")
// }
// }
// return nil, nil
// }
s.end += uint64(len(hashes)) / HashSize
takeover := &Takeover{
Stream: stream,
Expand Down
6 changes: 2 additions & 4 deletions swarm/network/stream/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,11 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
waitPeerErrC = make(chan error)

// here we distribute chunks of a random file into stores 1...nodes
rrdpa := storage.NewDPA(newRoundRobinStore(sim.Stores[1:]...), storage.NewChunkerParams())
rrdpa.Start()
rrdpa := storage.NewDPA(newRoundRobinStore(sim.Stores[1:]...), storage.NewDPAParams())
size := chunkCount * chunkSize
_, wait, err := rrdpa.Store(io.LimitReader(crand.Reader, int64(size)), int64(size))
_, wait, err := rrdpa.Store(io.LimitReader(crand.Reader, int64(size)), int64(size), false)
// need to wait cos we then immediately collect the relevant bin content
wait()
defer rrdpa.Stop()
if err != nil {
t.Fatal(err.Error())
}
Expand Down
2 changes: 1 addition & 1 deletion swarm/pss/pss.go
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,7 @@ func (self *Pss) checkFwdCache(addr []byte, digest pssDigest) bool {
// DPA storage handler for message cache
func (self *Pss) storeMsg(msg *PssMsg) (pssDigest, error) {
buf := bytes.NewReader(msg.serialize())
key, _, err := self.dpa.Store(buf, int64(buf.Len()))
key, _, err := self.dpa.Store(buf, int64(buf.Len()), false)
if err != nil {
log.Warn("Could not store in swarm", "err", err)
return pssDigest{}, err
Expand Down
Loading