diff --git a/cmd/swarm/config_test.go b/cmd/swarm/config_test.go index 57396d1408e2..266e0a714645 100644 --- a/cmd/swarm/config_test.go +++ b/cmd/swarm/config_test.go @@ -157,7 +157,6 @@ func TestConfigFileOverrides(t *testing.T) { defaultConf.NetworkId = 54 defaultConf.Port = httpPort defaultConf.StoreParams.DbCapacity = 9000000 - defaultConf.ChunkerParams.Branches = 64 defaultConf.HiveParams.KeepAliveInterval = 6000000000 defaultConf.Swap.Params.Strategy.AutoCashInterval = 600 * time.Second //defaultConf.SyncParams.KeyBufferSize = 512 @@ -237,10 +236,6 @@ func TestConfigFileOverrides(t *testing.T) { t.Fatalf("Expected network ID to be %d, got %d", 54, info.NetworkId) } - if info.ChunkerParams.Branches != 64 { - t.Fatalf("Expected chunker params branches to be %d, got %d", 64, info.ChunkerParams.Branches) - } - if info.HiveParams.KeepAliveInterval != 6000000000 { t.Fatalf("Expected HiveParams KeepAliveInterval to be %d, got %d", uint64(6000000000), uint64(info.HiveParams.KeepAliveInterval)) } @@ -374,7 +369,6 @@ func TestConfigCmdLineOverridesFile(t *testing.T) { defaultConf.NetworkId = 54 defaultConf.Port = "8588" defaultConf.StoreParams.DbCapacity = 9000000 - defaultConf.ChunkerParams.Branches = 64 defaultConf.HiveParams.KeepAliveInterval = 6000000000 defaultConf.Swap.Params.Strategy.AutoCashInterval = 600 * time.Second //defaultConf.SyncParams.KeyBufferSize = 512 @@ -456,10 +450,6 @@ func TestConfigCmdLineOverridesFile(t *testing.T) { t.Fatalf("Expected network ID to be %d, got %d", 54, info.NetworkId) } - if info.ChunkerParams.Branches != 64 { - t.Fatalf("Expected chunker params branches to be %d, got %d", 64, info.ChunkerParams.Branches) - } - if info.HiveParams.KeepAliveInterval != 6000000000 { t.Fatalf("Expected HiveParams KeepAliveInterval to be %d, got %d", uint64(6000000000), uint64(info.HiveParams.KeepAliveInterval)) } diff --git a/cmd/swarm/hash.go b/cmd/swarm/hash.go index a6e6a6ba7651..5fa95740d0bd 100644 --- a/cmd/swarm/hash.go +++ b/cmd/swarm/hash.go @@ -38,8 +38,8 @@ func hash(ctx *cli.Context) { defer f.Close() stat, _ := f.Stat() - chunker := storage.NewTreeChunker(storage.NewChunkerParams()) - key, _, err := chunker.Split(f, stat.Size(), nil) + dpa := storage.NewDPA(storage.NewMapChunkStore(), storage.NewDPAParams()) + key, _, err := dpa.Store(f, stat.Size(), false) if err != nil { utils.Fatalf("%v\n", err) } else { diff --git a/swarm/api/api.go b/swarm/api/api.go index f070c6c6421c..be09809613ad 100644 --- a/swarm/api/api.go +++ b/swarm/api/api.go @@ -243,7 +243,7 @@ func (self *Api) Retrieve(key storage.Key) storage.LazySectionReader { func (self *Api) Store(data io.Reader, size int64) (key storage.Key, wait func(), err error) { log.Debug("api.store", "size", size) - return self.dpa.Store(data, size) + return self.dpa.Store(data, size, false) } type ErrResolve error @@ -286,14 +286,14 @@ func (self *Api) Resolve(uri *URI) (storage.Key, error) { func (self *Api) Put(content, contentType string) (k storage.Key, wait func(), err error) { apiPutCount.Inc(1) r := strings.NewReader(content) - key, waitContent, err := self.dpa.Store(r, int64(len(content))) + key, waitContent, err := self.dpa.Store(r, int64(len(content)), false) if err != nil { apiPutFail.Inc(1) return nil, nil, err } manifest := fmt.Sprintf(`{"entries":[{"hash":"%v","contentType":"%s"}]}`, key, contentType) r = strings.NewReader(manifest) - key, waitManifest, err := self.dpa.Store(r, int64(len(manifest))) + key, waitManifest, err := self.dpa.Store(r, int64(len(manifest)), false) if err != nil { apiPutFail.Inc(1) return nil, nil, err diff --git a/swarm/api/api_test.go b/swarm/api/api_test.go index 49f12bb3a6c7..abbc8c0e96cc 100644 --- a/swarm/api/api_test.go +++ b/swarm/api/api_test.go @@ -43,9 +43,7 @@ func testApi(t *testing.T, f func(*Api)) { return } api := NewApi(dpa, nil, nil) - dpa.Start() f(api) - dpa.Stop() } type testResponse struct { diff --git a/swarm/api/config.go b/swarm/api/config.go index de58f1d4c8e6..3f93f8f930ef 100644 --- a/swarm/api/config.go +++ b/swarm/api/config.go @@ -43,7 +43,7 @@ const ( type Config struct { // serialised/persisted fields *storage.StoreParams - *storage.ChunkerParams + *storage.DPAParams *network.HiveParams Swap *swap.SwapParams //*network.SyncParams @@ -72,9 +72,9 @@ type Config struct { func NewConfig() (self *Config) { self = &Config{ - StoreParams: storage.NewDefaultStoreParams(), - ChunkerParams: storage.NewChunkerParams(), - HiveParams: network.NewHiveParams(), + StoreParams: storage.NewDefaultStoreParams(), + DPAParams: storage.NewDPAParams(), + HiveParams: network.NewHiveParams(), //SyncParams: network.NewDefaultSyncParams(), Swap: swap.NewDefaultSwapParams(), ListenAddr: DefaultHTTPListenAddr, diff --git a/swarm/api/filesystem.go b/swarm/api/filesystem.go index 0074ea167de4..8de4f4ee3e20 100644 --- a/swarm/api/filesystem.go +++ b/swarm/api/filesystem.go @@ -114,7 +114,7 @@ func (self *FileSystem) Upload(lpath, index string) (string, error) { stat, _ := f.Stat() var hash storage.Key var wait func() - hash, wait, err = self.api.dpa.Store(f, stat.Size()) + hash, wait, err = self.api.dpa.Store(f, stat.Size(), false) if hash != nil { list[i].Hash = hash.Hex() } diff --git a/swarm/api/http/server_test.go b/swarm/api/http/server_test.go index 4cbc3c30edb7..292da700871b 100644 --- a/swarm/api/http/server_test.go +++ b/swarm/api/http/server_test.go @@ -246,7 +246,7 @@ func TestBzzGetPath(t *testing.T) { for i, mf := range testmanifest { reader[i] = bytes.NewReader([]byte(mf)) var wait func() - key[i], wait, err = srv.Dpa.Store(reader[i], int64(len(mf))) + key[i], wait, err = srv.Dpa.Store(reader[i], int64(len(mf)), false) if err != nil { t.Fatal(err) } diff --git a/swarm/api/manifest.go b/swarm/api/manifest.go index 2f9771cbb929..aaf0035d6254 100644 --- a/swarm/api/manifest.go +++ b/swarm/api/manifest.go @@ -371,7 +371,7 @@ func (self *manifestTrie) recalcAndStore() error { } sr := bytes.NewReader(manifest) - key, wait, err2 := self.dpa.Store(sr, int64(len(manifest))) + key, wait, err2 := self.dpa.Store(sr, int64(len(manifest)), false) wait() self.hash = key return err2 diff --git a/swarm/fuse/swarmfs_test.go b/swarm/fuse/swarmfs_test.go index 4d89b132a1bc..0186749522c5 100644 --- a/swarm/fuse/swarmfs_test.go +++ b/swarm/fuse/swarmfs_test.go @@ -813,8 +813,6 @@ func TestFUSE(t *testing.T) { t.Fatal(err) } ta := &testAPI{api: api.NewApi(dpa, nil, nil)} - dpa.Start() - defer dpa.Stop() t.Run("mountListAndUmount", ta.mountListAndUnmount) t.Run("maxMounts", ta.maxMounts) diff --git a/swarm/network/stream/common_test.go b/swarm/network/stream/common_test.go index 998b6adf855a..cf19e9bc2de9 100644 --- a/swarm/network/stream/common_test.go +++ b/swarm/network/stream/common_test.go @@ -86,7 +86,7 @@ func NewStreamerService(ctx *adapters.ServiceContext) (node.Service, error) { go func() { waitPeerErrC <- waitForPeers(r, 1*time.Second, peerCount(id)) }() - dpa := storage.NewDPA(storage.NewNetStore(store, nil), storage.NewChunkerParams()) + dpa := storage.NewDPA(storage.NewNetStore(store, nil), storage.NewDPAParams()) return &TestRegistry{Registry: r, dpa: dpa}, nil } @@ -207,12 +207,10 @@ func (r *TestRegistry) ReadAll(hash common.Hash) (int64, error) { } func (r *TestRegistry) Start(server *p2p.Server) error { - r.dpa.Start() return r.Registry.Start(server) } func (r *TestRegistry) Stop() error { - r.dpa.Stop() return r.Registry.Stop() } diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go index 16273d15af5a..c5b791bd50e1 100644 --- a/swarm/network/stream/delivery_test.go +++ b/swarm/network/stream/delivery_test.go @@ -341,13 +341,11 @@ func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck } // here we distribute chunks of a random file into Stores of nodes 1 to nodes - rrdpa := storage.NewDPA(newRoundRobinStore(sim.Stores[1:]...), storage.NewChunkerParams()) - rrdpa.Start() + rrdpa := storage.NewDPA(newRoundRobinStore(sim.Stores[1:]...), storage.NewDPAParams()) size := chunkCount * chunkSize - fileHash, wait, err := rrdpa.Store(io.LimitReader(crand.Reader, int64(size)), int64(size)) + fileHash, wait, err := rrdpa.Store(io.LimitReader(crand.Reader, int64(size)), int64(size), false) // wait until all chunks stored wait() - defer rrdpa.Stop() if err != nil { t.Fatal(err.Error()) } @@ -395,11 +393,9 @@ func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck return delivery.RequestFromPeers(chunk.Key[:], skipCheck) } netStore := storage.NewNetStore(sim.Stores[0].(*storage.LocalStore), retrieveFunc) - dpa := storage.NewDPA(netStore, storage.NewChunkerParams()) - dpa.Start() + dpa := storage.NewDPA(netStore, storage.NewDPAParams()) go func() { - defer dpa.Stop() // start the retrieval on the pivot node - this will spawn retrieve requests for missing chunks // we must wait for the peer connections to have started before requesting n, err := readAll(dpa, fileHash) @@ -518,9 +514,7 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, conns, chunkCount int, skip waitPeerErrC = make(chan error) // create a dpa for the last node in the chain which we are gonna write to - remoteDpa := storage.NewDPA(sim.Stores[nodes-1], storage.NewChunkerParams()) - remoteDpa.Start() - defer remoteDpa.Stop() + remoteDpa := storage.NewDPA(sim.Stores[nodes-1], storage.NewDPAParams()) // channel to signal simulation initialisation with action call complete // or node disconnections @@ -614,7 +608,7 @@ Loop: hashes := make([]storage.Key, chunkCount) for i := 0; i < chunkCount; i++ { // create actual size real chunks - hash, wait, err := remoteDpa.Store(io.LimitReader(crand.Reader, int64(chunkSize)), int64(chunkSize)) + hash, wait, err := remoteDpa.Store(io.LimitReader(crand.Reader, int64(chunkSize)), int64(chunkSize), false) // wait until all chunks stored wait() if err != nil { diff --git a/swarm/network/stream/intervals_test.go b/swarm/network/stream/intervals_test.go index 4d05b866e0f9..d0de0399394f 100644 --- a/swarm/network/stream/intervals_test.go +++ b/swarm/network/stream/intervals_test.go @@ -105,12 +105,10 @@ func testIntervals(t *testing.T, live bool, history *Range) { return 1 } - dpa := storage.NewDPA(sim.Stores[0], storage.NewChunkerParams()) - dpa.Start() + dpa := storage.NewDPA(sim.Stores[0], storage.NewDPAParams()) size := chunkCount * chunkSize - _, wait, err := dpa.Store(io.LimitReader(crand.Reader, int64(size)), int64(size)) + _, wait, err := dpa.Store(io.LimitReader(crand.Reader, int64(size)), int64(size), false) wait() - defer dpa.Stop() if err != nil { t.Fatal(err) } diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go index d5e5927195fc..96f452416108 100644 --- a/swarm/network/stream/stream.go +++ b/swarm/network/stream/stream.go @@ -93,7 +93,7 @@ func NewRegistry(addr *network.BzzAddr, delivery *Delivery, db *storage.DBAPI, i return NewSwarmChunkServer(delivery.db), nil }) streamer.RegisterClientFunc(swarmChunkServerStreamName, func(p *Peer, _ string, _ bool) (Client, error) { - return NewSwarmSyncerClient(p, delivery.db, nil) + return NewSwarmSyncerClient(p, delivery.db) }) RegisterSwarmSyncerServer(streamer, db) RegisterSwarmSyncerClient(streamer, db) diff --git a/swarm/network/stream/syncer.go b/swarm/network/stream/syncer.go index 74690d5a2b9c..4121954f1985 100644 --- a/swarm/network/stream/syncer.go +++ b/swarm/network/stream/syncer.go @@ -17,10 +17,6 @@ package stream import ( - "bytes" - "errors" - "fmt" - "io" "math" "strconv" "time" @@ -138,17 +134,16 @@ type SwarmSyncerClient struct { retrieveC chan *storage.Chunk storeC chan *storage.Chunk db *storage.DBAPI - chunker storage.Chunker - currentRoot storage.Key - requestFunc func(chunk *storage.Chunk) - end, start uint64 + // chunker storage.Chunker + currentRoot storage.Key + requestFunc func(chunk *storage.Chunk) + end, start uint64 } // NewSwarmSyncerClient is a contructor for provable data exchange syncer -func NewSwarmSyncerClient(_ *Peer, db *storage.DBAPI, chunker storage.Chunker) (*SwarmSyncerClient, error) { +func NewSwarmSyncerClient(_ *Peer, db *storage.DBAPI) (*SwarmSyncerClient, error) { return &SwarmSyncerClient{ - db: db, - chunker: chunker, + db: db, }, nil } @@ -192,7 +187,7 @@ func NewSwarmSyncerClient(_ *Peer, db *storage.DBAPI, chunker storage.Chunker) ( // to handle incoming sync streams func RegisterSwarmSyncerClient(streamer *Registry, db *storage.DBAPI) { streamer.RegisterClientFunc("SYNC", func(p *Peer, _ string, love bool) (Client, error) { - return NewSwarmSyncerClient(p, db, nil) + return NewSwarmSyncerClient(p, db) }) } @@ -211,37 +206,39 @@ func (s *SwarmSyncerClient) NeedData(key []byte) (wait func()) { // BatchDone func (s *SwarmSyncerClient) BatchDone(stream Stream, from uint64, hashes []byte, root []byte) func() (*TakeoverProof, error) { - if s.chunker != nil { - return func() (*TakeoverProof, error) { return s.TakeoverProof(stream, from, hashes, root) } - } + // TODO: reenable this with putter/getter refactored code + // if s.chunker != nil { + // return func() (*TakeoverProof, error) { return s.TakeoverProof(stream, from, hashes, root) } + // } return nil } func (s *SwarmSyncerClient) TakeoverProof(stream Stream, from uint64, hashes []byte, root storage.Key) (*TakeoverProof, error) { // for provable syncer currentRoot is non-zero length - if s.chunker != nil { - if from > s.sessionAt { // for live syncing currentRoot is always updated - //expRoot, err := s.chunker.Append(s.currentRoot, bytes.NewReader(hashes), s.retrieveC, s.storeC) - expRoot, _, err := s.chunker.Append(s.currentRoot, bytes.NewReader(hashes), s.retrieveC) - if err != nil { - return nil, err - } - if !bytes.Equal(root, expRoot) { - return nil, fmt.Errorf("HandoverProof mismatch") - } - s.currentRoot = root - } else { - expHashes := make([]byte, len(hashes)) - _, err := s.sessionReader.ReadAt(expHashes, int64(s.end*HashSize)) - if err != nil && err != io.EOF { - return nil, err - } - if !bytes.Equal(expHashes, hashes) { - return nil, errors.New("invalid proof") - } - } - return nil, nil - } + // TODO: reenable this with putter/getter + // if s.chunker != nil { + // if from > s.sessionAt { // for live syncing currentRoot is always updated + // //expRoot, err := s.chunker.Append(s.currentRoot, bytes.NewReader(hashes), s.retrieveC, s.storeC) + // expRoot, _, err := s.chunker.Append(s.currentRoot, bytes.NewReader(hashes), s.retrieveC) + // if err != nil { + // return nil, err + // } + // if !bytes.Equal(root, expRoot) { + // return nil, fmt.Errorf("HandoverProof mismatch") + // } + // s.currentRoot = root + // } else { + // expHashes := make([]byte, len(hashes)) + // _, err := s.sessionReader.ReadAt(expHashes, int64(s.end*HashSize)) + // if err != nil && err != io.EOF { + // return nil, err + // } + // if !bytes.Equal(expHashes, hashes) { + // return nil, errors.New("invalid proof") + // } + // } + // return nil, nil + // } s.end += uint64(len(hashes)) / HashSize takeover := &Takeover{ Stream: stream, diff --git a/swarm/network/stream/syncer_test.go b/swarm/network/stream/syncer_test.go index 9c8e7f912561..ad4d9634c0f9 100644 --- a/swarm/network/stream/syncer_test.go +++ b/swarm/network/stream/syncer_test.go @@ -94,13 +94,11 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck waitPeerErrC = make(chan error) // here we distribute chunks of a random file into stores 1...nodes - rrdpa := storage.NewDPA(newRoundRobinStore(sim.Stores[1:]...), storage.NewChunkerParams()) - rrdpa.Start() + rrdpa := storage.NewDPA(newRoundRobinStore(sim.Stores[1:]...), storage.NewDPAParams()) size := chunkCount * chunkSize - _, wait, err := rrdpa.Store(io.LimitReader(crand.Reader, int64(size)), int64(size)) + _, wait, err := rrdpa.Store(io.LimitReader(crand.Reader, int64(size)), int64(size), false) // need to wait cos we then immediately collect the relevant bin content wait() - defer rrdpa.Stop() if err != nil { t.Fatal(err.Error()) } diff --git a/swarm/pss/pss.go b/swarm/pss/pss.go index d2d02898a62d..36bf01e46c40 100644 --- a/swarm/pss/pss.go +++ b/swarm/pss/pss.go @@ -775,7 +775,7 @@ func (self *Pss) checkFwdCache(addr []byte, digest pssDigest) bool { // DPA storage handler for message cache func (self *Pss) storeMsg(msg *PssMsg) (pssDigest, error) { buf := bytes.NewReader(msg.serialize()) - key, _, err := self.dpa.Store(buf, int64(buf.Len())) + key, _, err := self.dpa.Store(buf, int64(buf.Len()), false) if err != nil { log.Warn("Could not store in swarm", "err", err) return pssDigest{}, err diff --git a/swarm/storage/chunker.go b/swarm/storage/chunker.go index d13953022212..c2bac9859324 100644 --- a/swarm/storage/chunker.go +++ b/swarm/storage/chunker.go @@ -70,30 +70,141 @@ var ( newChunkCounter = metrics.NewRegisteredCounter("storage.chunks.new", nil) ) +const ( + DefaultChunkSize int64 = 4096 +) + +type ChunkerParams struct { + chunkSize int64 + hashSize int64 +} + +type SplitterParams struct { + ChunkerParams + reader io.Reader + putter Putter + key Key +} + +type TreeSplitterParams struct { + SplitterParams + size int64 +} + +type JoinerParams struct { + ChunkerParams + key Key + getter Getter + // TODO: there is a bug, so depth can only be 0 today, see: https://github.com/ethersphere/go-ethereum/issues/344 + depth int +} + type TreeChunker struct { branches int64 hashFunc SwarmHasher + dataSize int64 + data io.Reader // calculated + key Key + depth int hashSize int64 // self.hashFunc.New().Size() chunkSize int64 // hashSize* branches workerCount int64 // the number of worker routines used workerLock sync.RWMutex // lock for the worker count + jobC chan *hashJob + wg *sync.WaitGroup + putter Putter + getter Getter + errC chan error + quitC chan bool +} + +/* + Join reconstructs original content based on a root key. + When joining, the caller gets returned a Lazy SectionReader, which is + seekable and implements on-demand fetching of chunks as and where it is read. + New chunks to retrieve are coming from the getter, which the caller provides. + If an error is encountered during joining, it appears as a reader error. + The SectionReader. + As a result, partial reads from a document are possible even if other parts + are corrupt or lost. + The chunks are not meant to be validated by the chunker when joining. This + is because it is left to the DPA to decide which sources are trusted. +*/ +func TreeJoin(key Key, getter Getter, depth int) LazySectionReader { + return NewTreeJoiner(NewJoinerParams(key, getter, depth, DefaultChunkSize)).Join() } -func NewTreeChunker(params *ChunkerParams) (self *TreeChunker) { - self = &TreeChunker{} - self.hashFunc = MakeHashFunc(params.Hash) - self.branches = params.Branches - self.hashSize = int64(self.hashFunc().Size()) +/* + When splitting, data is given as a SectionReader, and the key is a hashSize long byte slice (Key), the root hash of the entire content will fill this once processing finishes. + New chunks to store are store using the putter which the caller provides. +*/ +func TreeSplit(data io.Reader, size int64, putter Putter) (k Key, wait func(), err error) { + return NewTreeSplitter(NewTreeSplitterParams(data, putter, size, DefaultChunkSize)).Split() +} + +func NewJoinerParams(key Key, getter Getter, depth int, chunkSize int64) *JoinerParams { + hashSize := int64(len(key)) + return &JoinerParams{ + ChunkerParams: ChunkerParams{ + chunkSize: chunkSize, + hashSize: hashSize, + }, + key: key, + getter: getter, + depth: depth, + } +} + +func NewTreeJoiner(params *JoinerParams) *TreeChunker { + self := &TreeChunker{} + self.hashSize = params.hashSize + self.branches = params.chunkSize / self.hashSize + self.key = params.key + self.getter = params.getter + self.depth = params.depth self.chunkSize = self.hashSize * self.branches self.workerCount = 0 + self.jobC = make(chan *hashJob, 2*ChunkProcessors) + self.wg = &sync.WaitGroup{} + self.errC = make(chan error) + self.quitC = make(chan bool) - return + return self } -// func (self *TreeChunker) KeySize() int64 { -// return self.hashSize -// } +func NewTreeSplitterParams(reader io.Reader, putter Putter, size int64, branches int64) *TreeSplitterParams { + hashSize := putter.RefSize() + return &TreeSplitterParams{ + SplitterParams: SplitterParams{ + ChunkerParams: ChunkerParams{ + chunkSize: chunkSize, + hashSize: hashSize, + }, + reader: reader, + putter: putter, + }, + size: size, + } +} + +func NewTreeSplitter(params *TreeSplitterParams) *TreeChunker { + self := &TreeChunker{} + self.data = params.reader + self.dataSize = params.size + self.hashSize = params.hashSize + self.branches = params.chunkSize / self.hashSize + self.key = params.key + self.chunkSize = self.hashSize * self.branches + self.putter = params.putter + self.workerCount = 0 + self.jobC = make(chan *hashJob, 2*ChunkProcessors) + self.wg = &sync.WaitGroup{} + self.errC = make(chan error) + self.quitC = make(chan bool) + + return self +} // String() for pretty printing func (self *Chunk) String() string { @@ -125,45 +236,39 @@ func (self *TreeChunker) decrementWorkerCount() { self.workerCount -= 1 } -func (self *TreeChunker) Split(data io.Reader, size int64, chunkC chan *Chunk) (k Key, wait func(), err error) { +func (self *TreeChunker) Split() (k Key, wait func(), err error) { if self.chunkSize <= 0 { panic("chunker must be initialised") } - jobC := make(chan *hashJob, 2*ChunkProcessors) - wg := &sync.WaitGroup{} - storeWg := &sync.WaitGroup{} - errC := make(chan error) - quitC := make(chan bool) - - self.incrementWorkerCount() - self.runHashWorker(jobC, chunkC, errC, quitC, storeWg) + self.runWorker() depth := 0 treeSize := self.chunkSize // takes lowest depth such that chunksize*HashCount^(depth+1) > size // power series, will find the order of magnitude of the data size in base hashCount or numbers of levels of branching in the resulting tree. - for ; treeSize < size; treeSize *= self.branches { + for ; treeSize < self.dataSize; treeSize *= self.branches { depth++ } - key := make([]byte, self.hashFunc().Size()) + key := make([]byte, self.hashSize) // this waitgroup member is released after the root hash is calculated - wg.Add(1) + self.wg.Add(1) //launch actual recursive function passing the waitgroups - go self.split(depth, treeSize/self.branches, key, data, size, jobC, chunkC, errC, quitC, wg, storeWg) + go self.split(depth, treeSize/self.branches, key, self.dataSize, self.wg) // closes internal error channel if all subprocesses in the workgroup finished go func() { // waiting for all threads to finish - wg.Wait() - close(errC) + self.wg.Wait() + close(self.errC) }() - defer close(quitC) + defer close(self.quitC) + defer self.putter.Close() select { - case err := <-errC: + case err := <-self.errC: if err != nil { return nil, nil, err } @@ -171,10 +276,10 @@ func (self *TreeChunker) Split(data io.Reader, size int64, chunkC chan *Chunk) ( return nil, nil, errOperationTimedOut } - return key, storeWg.Wait, nil + return key, self.putter.Wait, nil } -func (self *TreeChunker) split(depth int, treeSize int64, key Key, data io.Reader, size int64, jobC chan *hashJob, chunkC chan *Chunk, errC chan error, quitC chan bool, parentWg, storeWg *sync.WaitGroup) { +func (self *TreeChunker) split(depth int, treeSize int64, key Key, size int64, parentWg *sync.WaitGroup) { // @@ -189,16 +294,16 @@ func (self *TreeChunker) split(depth int, treeSize int64, key Key, data io.Reade binary.LittleEndian.PutUint64(chunkData[0:8], uint64(size)) var readBytes int64 for readBytes < size { - n, err := data.Read(chunkData[8+readBytes:]) + n, err := self.data.Read(chunkData[8+readBytes:]) readBytes += int64(n) if err != nil && !(err == io.EOF && readBytes == size) { - errC <- err + self.errC <- err return } } select { - case jobC <- &hashJob{key, chunkData, size, parentWg}: - case <-quitC: + case self.jobC <- &hashJob{key, chunkData, size, parentWg}: + case <-self.quitC: } return } @@ -224,7 +329,7 @@ func (self *TreeChunker) split(depth int, treeSize int64, key Key, data io.Reade subTreeKey := chunk[8+i*self.hashSize : 8+(i+1)*self.hashSize] childrenWg.Add(1) - self.split(depth-1, treeSize/self.branches, subTreeKey, data, secSize, jobC, chunkC, errC, quitC, childrenWg, storeWg) + self.split(depth-1, treeSize/self.branches, subTreeKey, secSize, childrenWg) i++ pos += treeSize @@ -235,119 +340,89 @@ func (self *TreeChunker) split(depth int, treeSize int64, key Key, data io.Reade childrenWg.Wait() worker := self.getWorkerCount() - if int64(len(jobC)) > worker && worker < ChunkProcessors { - self.incrementWorkerCount() - self.runHashWorker(jobC, chunkC, errC, quitC, storeWg) + if int64(len(self.jobC)) > worker && worker < ChunkProcessors { + self.runWorker() } select { - case jobC <- &hashJob{key, chunk, size, parentWg}: - case <-quitC: + case self.jobC <- &hashJob{key, chunk, size, parentWg}: + case <-self.quitC: } } -func (self *TreeChunker) runHashWorker(jobC chan *hashJob, chunkC chan *Chunk, errC chan error, quitC chan bool, storeWg *sync.WaitGroup) { - storeWg.Add(1) - +func (self *TreeChunker) runWorker() { + self.incrementWorkerCount() go func() { defer self.decrementWorkerCount() - defer storeWg.Done() - hasher := self.hashFunc() for { select { - case job, ok := <-jobC: + case job, ok := <-self.jobC: if !ok { return } - // now we got the hashes in the chunk, then hash the chunks - self.hashChunk(hasher, job, chunkC, storeWg) - case <-quitC: + + h, err := self.putter.Put(job.chunk) + if err != nil { + self.errC <- err + return + } + copy(job.key, h) + job.parentWg.Done() + case <-self.quitC: return } } }() } -// The treeChunkers own Hash hashes together -// - the size (of the subtree encoded in the Chunk) -// - the Chunk, ie. the contents read from the input reader -func (self *TreeChunker) hashChunk(hasher SwarmHash, job *hashJob, chunkC chan *Chunk, storeWg *sync.WaitGroup) { - hasher.ResetWithLength(job.chunk[:8]) // 8 bytes of length - hasher.Write(job.chunk[8:]) // minus 8 []byte length - h := hasher.Sum(nil) - - newChunk := NewChunk(h, nil) - newChunk.SData = job.chunk - newChunk.Size = job.size - - // report hash of this chunk one level up (keys corresponds to the proper subslice of the parent chunk) - copy(job.key, h) - // send off new chunk to storage - job.parentWg.Done() - - if chunkC != nil { - //NOTE: this increases the chunk count even if the local node already has this chunk; - //on file upload the node will increase this counter even if the same file has already been uploaded - //So it should be evaluated whether it is worth keeping this counter - //and/or actually better track when the chunk is Put to the local database - //(which may question the need for disambiguation when a completely new chunk has been created - //and/or a chunk is being put to the local DB; for chunk tracking it may be worth distinguishing - newChunkCounter.Inc(1) - chunkC <- newChunk - storeWg.Add(1) - go func() { - defer storeWg.Done() - <-newChunk.dbStoredC - }() - } -} - -func (self *TreeChunker) Append(key Key, data io.Reader, chunkC chan *Chunk) (Key, func(), error) { +func (self *TreeChunker) Append() (Key, func(), error) { return nil, nil, errAppendOppNotSuported } // LazyChunkReader implements LazySectionReader type LazyChunkReader struct { - key Key // root key - chunkC chan *Chunk // chunk channel to send retrieve requests on - chunk *Chunk // size of the entire subtree - off int64 // offset - chunkSize int64 // inherit from chunker - branches int64 // inherit from chunker - hashSize int64 // inherit from chunker + key Key // root key + chunkData ChunkData + off int64 // offset + chunkSize int64 // inherit from chunker + branches int64 // inherit from chunker + hashSize int64 // inherit from chunker depth int + getter Getter } // implements the Joiner interface -func (self *TreeChunker) Join(key Key, chunkC chan *Chunk, depth int) LazySectionReader { +func (self *TreeChunker) Join() LazySectionReader { return &LazyChunkReader{ - key: key, - chunkC: chunkC, + key: self.key, chunkSize: self.chunkSize, branches: self.branches, hashSize: self.hashSize, - depth: depth, + depth: self.depth, + getter: self.getter, } } // Size is meant to be called on the LazySectionReader func (self *LazyChunkReader) Size(quitC chan bool) (n int64, err error) { log.Debug("lazychunkreader.size", "key", self.key) - if self.chunk != nil { - return self.chunk.Size, nil - } - chunk := retrieve(self.key, self.chunkC, quitC) - if chunk == nil { - select { - case <-quitC: - return 0, errors.New("aborted") - default: - return 0, fmt.Errorf("root chunk not found for %v", self.key.Hex()) + if self.chunkData == nil { + chunkData, err := self.getter.Get(Reference(self.key)) + if err != nil { + return 0, err + } + if chunkData == nil { + select { + case <-quitC: + return 0, errors.New("aborted") + default: + return 0, fmt.Errorf("root chunk not found for %v", self.key.Hex()) + } } + self.chunkData = chunkData } - self.chunk = chunk - return chunk.Size, nil + return self.chunkData.Size(), nil } // read at can be called numerous times @@ -381,7 +456,7 @@ func (self *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) { length *= self.chunkSize } wg.Add(1) - go self.join(b, off, off+length, depth, treeSize/self.branches, self.chunk, &wg, errC, quitC) + go self.join(b, off, off+length, depth, treeSize/self.branches, self.chunkData, &wg, errC, quitC) go func() { wg.Wait() close(errC) @@ -390,7 +465,6 @@ func (self *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) { err = <-errC if err != nil { close(quitC) - return 0, err } if off+int64(len(b)) >= size { @@ -399,21 +473,21 @@ func (self *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) { return len(b), nil } -func (self *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, treeSize int64, chunk *Chunk, parentWg *sync.WaitGroup, errC chan error, quitC chan bool) { +func (self *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, treeSize int64, chunkData ChunkData, parentWg *sync.WaitGroup, errC chan error, quitC chan bool) { defer parentWg.Done() // find appropriate block level - for chunk.Size < treeSize && depth > self.depth { + for chunkData.Size() < treeSize && depth > self.depth { treeSize /= self.branches depth-- } // leaf chunk found if depth == self.depth { - extra := 8 + eoff - int64(len(chunk.SData)) + extra := 8 + eoff - int64(len(chunkData)) if extra > 0 { eoff -= extra } - copy(b, chunk.SData[8+off:8+eoff]) + copy(b, chunkData[8+off:8+eoff]) return // simply give back the chunks reader for content chunks } @@ -440,9 +514,9 @@ func (self *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, tr } wg.Add(1) go func(j int64) { - childKey := chunk.SData[8+j*self.hashSize : 8+(j+1)*self.hashSize] - chunk := retrieve(childKey, self.chunkC, quitC) - if chunk == nil { + childKey := chunkData[8+j*self.hashSize : 8+(j+1)*self.hashSize] + chunkData, err := self.getter.Get(Reference(childKey)) + if err != nil { select { case errC <- fmt.Errorf("chunk %v-%v not found", off, off+treeSize): case <-quitC: @@ -452,41 +526,11 @@ func (self *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, tr if soff < off { soff = off } - self.join(b[soff-off:seoff-off], soff-roff, seoff-roff, depth-1, treeSize/self.branches, chunk, wg, errC, quitC) + self.join(b[soff-off:seoff-off], soff-roff, seoff-roff, depth-1, treeSize/self.branches, chunkData, wg, errC, quitC) }(i) } //for } -// the helper method submits chunks for a key to a oueue (DPA) and -// block until they time out or arrive -// abort if quitC is readable -func retrieve(key Key, chunkC chan *Chunk, quitC chan bool) *Chunk { - log.Debug("retrieve", "key", key) - chunk := NewChunk(key, nil) - chunk.C = make(chan bool) - // submit chunk for retrieval - log.Debug("submit chunk for retrieval", "key", key) - select { - case chunkC <- chunk: // submit retrieval request, someone should be listening on the other side (or we will time out globally) - case <-quitC: - return nil - } - // waiting for the chunk retrieval - log.Debug("waiting for the chunk retrieval", "key", key) - select { // chunk.Size = int64(binary.LittleEndian.Uint64(chunk.SData[0:8])) - - case <-quitC: - // this is how we control process leakage (quitC is closed once join is finished (after timeout)) - return nil - case <-chunk.C: // bells are ringing, data have been delivered - } - if len(chunk.SData) == 0 { - return nil - } - log.Debug("chunk retrieved", "key", key) - return chunk -} - // Read keeps a cursor so cannot be called simulateously, see ReadAt func (self *LazyChunkReader) Read(b []byte) (read int, err error) { log.Debug("lazychunkreader.read", "key", self.key) @@ -510,13 +554,13 @@ func (s *LazyChunkReader) Seek(offset int64, whence int) (int64, error) { case 1: offset += s.off case 2: - if s.chunk == nil { //seek from the end requires rootchunk for size. call Size first + if s.chunkData == nil { //seek from the end requires rootchunk for size. call Size first _, err := s.Size(nil) if err != nil { return 0, fmt.Errorf("can't get size: %v", err) } } - offset += s.chunk.Size + offset += s.chunkData.Size() } if offset < 0 { diff --git a/swarm/storage/chunker_test.go b/swarm/storage/chunker_test.go index c1cd48502908..5d9ea26a0183 100644 --- a/swarm/storage/chunker_test.go +++ b/swarm/storage/chunker_test.go @@ -24,7 +24,6 @@ import ( "fmt" "io" "testing" - "time" "github.com/ethereum/go-ethereum/crypto/sha3" ) @@ -40,136 +39,33 @@ type test interface { type chunkerTester struct { inputs map[uint64][]byte - chunks map[string]*Chunk t test } -func (self *chunkerTester) Split(chunker Splitter, data io.Reader, size int64, chunkC chan *Chunk, expectedError error) (key Key, wait func(), err error) { - // reset - self.chunks = make(map[string]*Chunk) - - if self.inputs == nil { - self.inputs = make(map[uint64][]byte) - } - - quitC := make(chan bool) - timeout := time.After(600 * time.Second) - if chunkC != nil { - go func() error { - for { - select { - case <-timeout: - return errors.New("Split timeout error") - case <-quitC: - return nil - case chunk := <-chunkC: - // self.chunks = append(self.chunks, chunk) - self.chunks[chunk.Key.Hex()] = chunk - chunk.markAsStored() - } - - } - }() - } - - var w func() - key, w, err = chunker.Split(data, size, chunkC) - if err != nil && expectedError == nil { - err = fmt.Errorf("Split error: %v", err) - } - if chunkC != nil { - wait = func() { - w() - close(quitC) - } - } else { - wait = func() {} - } - return key, wait, err +// fakeChunkStore doesn't store anything, just implements the ChunkStore interface +// It can be used to inject into a hasherStore if you don't want to actually store data just do the +// hashing +type fakeChunkStore struct { } -func (self *chunkerTester) Append(chunker Splitter, rootKey Key, data io.Reader, chunkC chan *Chunk, expectedError error) (key Key, wait func(), err error) { - quitC := make(chan bool) - timeout := time.After(60 * time.Second) - if chunkC != nil { - go func() error { - for { - select { - case <-timeout: - return errors.New("Append timeout error") - case <-quitC: - return nil - case chunk := <-chunkC: - if chunk != nil { - stored, success := self.chunks[chunk.Key.Hex()] - if !success { - // Requesting data - self.chunks[chunk.Key.Hex()] = chunk - chunk.markAsStored() - } else { - // getting data - chunk.SData = stored.SData - chunk.Size = int64(binary.LittleEndian.Uint64(chunk.SData[0:8])) - chunk.markAsStored() - if chunk.C != nil { - close(chunk.C) - } - } - } - } - } - }() - } - var w func() - key, w, err = chunker.Append(rootKey, data, chunkC) - if err != nil && expectedError == nil { - err = fmt.Errorf("Append error: %v", err) - } +// Put doesn't store anything it is just here to implement ChunkStore +func (f *fakeChunkStore) Put(*Chunk) { +} - if chunkC != nil { - wait = func() { - w() - close(quitC) - } - } else { - wait = func() {} - } - return key, wait, err +// Gut doesn't store anything it is just here to implement ChunkStore +func (f *fakeChunkStore) Get(Key) (*Chunk, error) { + return nil, errors.New("FakeChunkStore doesn't support Get") } -func (self *chunkerTester) Join(chunker Chunker, key Key, c int, chunkC chan *Chunk, quitC chan bool) LazySectionReader { - // reset but not the chunks - - timeout := time.After(600 * time.Second) - i := 0 - go func() error { - for { - select { - case <-timeout: - return errors.New("Join timeout error") - case chunk, ok := <-chunkC: - if !ok { - close(quitC) - return nil - } - // this just mocks the behaviour of a chunk store retrieval - stored, success := self.chunks[chunk.Key.Hex()] - if !success { - return errors.New("Not found") - } - chunk.SData = stored.SData - chunk.Size = int64(binary.LittleEndian.Uint64(chunk.SData[0:8])) - close(chunk.C) - i++ - } - } - }() +// Close doesn't store anything it is just here to implement ChunkStore +func (f *fakeChunkStore) Close() { +} - reader := chunker.Join(key, chunkC, 0) - return reader +func newTestHasherStore(chunkStore ChunkStore, hash string) *hasherStore { + return NewHasherStore(chunkStore, MakeHashFunc(hash), false) } -func testRandomBrokenData(splitter Splitter, n int, tester *chunkerTester) { +func testRandomBrokenData(n int, tester *chunkerTester) { data := io.LimitReader(rand.Reader, int64(n)) brokendata := brokenLimitReader(data, n, n/2) @@ -182,17 +78,17 @@ func testRandomBrokenData(splitter Splitter, n int, tester *chunkerTester) { data = io.LimitReader(rand.Reader, int64(n)) brokendata = brokenLimitReader(data, n, n/2) - chunkC := make(chan *Chunk, 1000) + putGetter := newTestHasherStore(NewMapChunkStore(), SHA3Hash) expectedError := fmt.Errorf("Broken reader") - key, _, err := tester.Split(splitter, brokendata, int64(n), chunkC, expectedError) + key, _, err := TreeSplit(brokendata, int64(n), putGetter) if err == nil || err.Error() != expectedError.Error() { tester.t.Fatalf("Not receiving the correct error! Expected %v, received %v", expectedError, err) } tester.t.Logf(" Key = %v\n", key) } -func testRandomData(splitter Splitter, n int, tester *chunkerTester) Key { +func testRandomData(usePyramid bool, hash string, n int, tester *chunkerTester) Key { if tester.inputs == nil { tester.inputs = make(map[uint64][]byte) } @@ -205,19 +101,23 @@ func testRandomData(splitter Splitter, n int, tester *chunkerTester) Key { data = io.LimitReader(bytes.NewReader(input), int64(n)) } - chunkC := make(chan *Chunk, 1000) + putGetter := newTestHasherStore(NewMapChunkStore(), hash) - key, wait, err := tester.Split(splitter, data, int64(n), chunkC, nil) + var key Key + var wait func() + var err error + if usePyramid { + key, wait, err = PyramidSplit(data, putGetter, putGetter) + } else { + key, wait, err = TreeSplit(data, int64(n), putGetter) + } if err != nil { tester.t.Fatalf(err.Error()) } tester.t.Logf(" Key = %v\n", key) wait() - chunkC = make(chan *Chunk, 1000) - quitC := make(chan bool) - chunker := NewTreeChunker(NewChunkerParams()) - reader := tester.Join(chunker, key, 0, chunkC, quitC) + reader := TreeJoin(key, putGetter, 0) output := make([]byte, n) r, err := reader.Read(output) if r != n || err != io.EOF { @@ -228,8 +128,6 @@ func testRandomData(splitter Splitter, n int, tester *chunkerTester) Key { tester.t.Fatalf("input and output mismatch\n IN: %v\nOUT: %v\n", input, output) } } - close(chunkC) - <-quitC return key } @@ -284,10 +182,10 @@ func TestDataAppend(t *testing.T) { data = io.LimitReader(bytes.NewReader(input), int64(n)) } - chunkC := make(chan *Chunk, 1000) + chunkStore := NewMapChunkStore() + putGetter := newTestHasherStore(chunkStore, SHA3Hash) - chunker := NewPyramidChunker(NewChunkerParams()) - key, wait, err := tester.Split(chunker, data, int64(n), chunkC, nil) + key, wait, err := PyramidSplit(data, putGetter, putGetter) if err != nil { tester.t.Fatalf(err.Error()) } @@ -303,19 +201,14 @@ func TestDataAppend(t *testing.T) { appendData = io.LimitReader(bytes.NewReader(appendInput), int64(m)) } - chunkC = make(chan *Chunk, 1000) - - newKey, wait, err := tester.Append(chunker, key, appendData, chunkC, nil) + putGetter = newTestHasherStore(chunkStore, SHA3Hash) + newKey, wait, err := PyramidAppend(key, appendData, putGetter, putGetter) if err != nil { tester.t.Fatalf(err.Error()) } wait() - chunkC = make(chan *Chunk, 1000) - quitC := make(chan bool) - - treeChunker := NewTreeChunker(NewChunkerParams()) - reader := tester.Join(treeChunker, newKey, 0, chunkC, quitC) + reader := TreeJoin(newKey, putGetter, 0) newOutput := make([]byte, n+m) r, err := reader.Read(newOutput) if r != (n + m) { @@ -326,8 +219,6 @@ func TestDataAppend(t *testing.T) { if !bytes.Equal(newOutput, newInput) { tester.t.Fatalf("input and output mismatch\n IN: %v\nOUT: %v\n", newInput, newOutput) } - - close(chunkC) } } @@ -335,36 +226,28 @@ func TestRandomData(t *testing.T) { sizes := []int{1, 60, 83, 179, 253, 1024, 4095, 4096, 4097, 8191, 8192, 8193, 12287, 12288, 12289, 123456, 2345678} tester := &chunkerTester{t: t} - chunker := NewTreeChunker(NewChunkerParams()) - pyramid := NewPyramidChunker(NewChunkerParams()) for _, s := range sizes { - treeChunkerKey := testRandomData(chunker, s, tester) - pyramidChunkerKey := testRandomData(pyramid, s, tester) + treeChunkerKey := testRandomData(false, SHA3Hash, s, tester) + pyramidChunkerKey := testRandomData(true, SHA3Hash, s, tester) if treeChunkerKey.String() != pyramidChunkerKey.String() { tester.t.Fatalf("tree chunker and pyramid chunker key mismatch for size %v\n TC: %v\n PC: %v\n", s, treeChunkerKey.String(), pyramidChunkerKey.String()) } } - cp := NewChunkerParams() - cp.Hash = BMTHash - chunker = NewTreeChunker(cp) - pyramid = NewPyramidChunker(cp) for _, s := range sizes { - treeChunkerKey := testRandomData(chunker, s, tester) - pyramidChunkerKey := testRandomData(pyramid, s, tester) + treeChunkerKey := testRandomData(false, BMTHash, s, tester) + pyramidChunkerKey := testRandomData(true, BMTHash, s, tester) if treeChunkerKey.String() != pyramidChunkerKey.String() { - tester.t.Fatalf("tree chunker BMT and pyramid chunker BMT key mismatch for size %v \n TC: %v\n PC: %v\n", s, treeChunkerKey.String(), pyramidChunkerKey.String()) + tester.t.Fatalf("tree chunker and pyramid chunker key mismatch for size %v\n TC: %v\n PC: %v\n", s, treeChunkerKey.String(), pyramidChunkerKey.String()) } } - } -func XTestRandomBrokenData(t *testing.T) { +func TestRandomBrokenData(t *testing.T) { sizes := []int{1, 60, 83, 179, 253, 1024, 4095, 4096, 4097, 8191, 8192, 8193, 12287, 12288, 12289, 123456, 2345678} tester := &chunkerTester{t: t} - chunker := NewTreeChunker(NewChunkerParams()) for _, s := range sizes { - testRandomBrokenData(chunker, s, tester) + testRandomBrokenData(s, tester) } } @@ -376,38 +259,31 @@ func benchReadAll(reader LazySectionReader) { } } -func benchmarkJoin(n int, t *testing.B) { +func benchmarkSplitJoin(n int, t *testing.B) { t.ReportAllocs() for i := 0; i < t.N; i++ { - chunker := NewTreeChunker(NewChunkerParams()) - tester := &chunkerTester{t: t} data := testDataReader(n) - chunkC := make(chan *Chunk, 1000) - - key, wait, err := tester.Split(chunker, data, int64(n), chunkC, nil) + putGetter := newTestHasherStore(NewMapChunkStore(), SHA3Hash) + key, wait, err := PyramidSplit(data, putGetter, putGetter) if err != nil { - tester.t.Fatalf(err.Error()) + t.Fatalf(err.Error()) } wait() - chunkC = make(chan *Chunk, 1000) - quitC := make(chan bool) - reader := tester.Join(chunker, key, i, chunkC, quitC) + reader := TreeJoin(key, putGetter, 0) benchReadAll(reader) - close(chunkC) - <-quitC } } func benchmarkSplitTreeSHA3(n int, t *testing.B) { t.ReportAllocs() for i := 0; i < t.N; i++ { - chunker := NewTreeChunker(NewChunkerParams()) - tester := &chunkerTester{t: t} data := testDataReader(n) - _, _, err := tester.Split(chunker, data, int64(n), nil, nil) + putGetter := newTestHasherStore(&fakeChunkStore{}, SHA3Hash) + + _, _, err := TreeSplit(data, int64(n), putGetter) if err != nil { - tester.t.Fatalf(err.Error()) + t.Fatalf(err.Error()) } } } @@ -415,14 +291,12 @@ func benchmarkSplitTreeSHA3(n int, t *testing.B) { func benchmarkSplitTreeBMT(n int, t *testing.B) { t.ReportAllocs() for i := 0; i < t.N; i++ { - cp := NewChunkerParams() - cp.Hash = BMTHash - chunker := NewTreeChunker(cp) - tester := &chunkerTester{t: t} data := testDataReader(n) - _, _, err := tester.Split(chunker, data, int64(n), nil, nil) + putGetter := newTestHasherStore(&fakeChunkStore{}, BMTHash) + + _, _, err := TreeSplit(data, int64(n), putGetter) if err != nil { - tester.t.Fatalf(err.Error()) + t.Fatalf(err.Error()) } } } @@ -430,12 +304,12 @@ func benchmarkSplitTreeBMT(n int, t *testing.B) { func benchmarkSplitPyramidSHA3(n int, t *testing.B) { t.ReportAllocs() for i := 0; i < t.N; i++ { - splitter := NewPyramidChunker(NewChunkerParams()) - tester := &chunkerTester{t: t} data := testDataReader(n) - _, _, err := tester.Split(splitter, data, int64(n), nil, nil) + putGetter := newTestHasherStore(&fakeChunkStore{}, SHA3Hash) + + _, _, err := PyramidSplit(data, putGetter, putGetter) if err != nil { - tester.t.Fatalf(err.Error()) + t.Fatalf(err.Error()) } } @@ -444,51 +318,48 @@ func benchmarkSplitPyramidSHA3(n int, t *testing.B) { func benchmarkSplitPyramidBMT(n int, t *testing.B) { t.ReportAllocs() for i := 0; i < t.N; i++ { - cp := NewChunkerParams() - cp.Hash = BMTHash - splitter := NewPyramidChunker(cp) - tester := &chunkerTester{t: t} data := testDataReader(n) - _, _, err := tester.Split(splitter, data, int64(n), nil, nil) + putGetter := newTestHasherStore(&fakeChunkStore{}, BMTHash) + + _, _, err := PyramidSplit(data, putGetter, putGetter) if err != nil { - tester.t.Fatalf(err.Error()) + t.Fatalf(err.Error()) } } } -func benchmarkAppendPyramid(n, m int, t *testing.B) { +func benchmarkSplitAppendPyramid(n, m int, t *testing.B) { t.ReportAllocs() for i := 0; i < t.N; i++ { - chunker := NewPyramidChunker(NewChunkerParams()) - tester := &chunkerTester{t: t} data := testDataReader(n) data1 := testDataReader(m) - chunkC := make(chan *Chunk, 1000) - key, wait, err := tester.Split(chunker, data, int64(n), chunkC, nil) + chunkStore := NewMapChunkStore() + putGetter := newTestHasherStore(chunkStore, SHA3Hash) + + key, wait, err := PyramidSplit(data, putGetter, putGetter) if err != nil { - tester.t.Fatalf(err.Error()) + t.Fatalf(err.Error()) } wait() - chunkC = make(chan *Chunk, 1000) - _, wait, err = tester.Append(chunker, key, data1, chunkC, nil) + putGetter = newTestHasherStore(chunkStore, SHA3Hash) + _, wait, err = PyramidAppend(key, data1, putGetter, putGetter) if err != nil { - tester.t.Fatalf(err.Error()) + t.Fatalf(err.Error()) } wait() - close(chunkC) } } -func BenchmarkJoin_2(t *testing.B) { benchmarkJoin(100, t) } -func BenchmarkJoin_3(t *testing.B) { benchmarkJoin(1000, t) } -func BenchmarkJoin_4(t *testing.B) { benchmarkJoin(10000, t) } -func BenchmarkJoin_5(t *testing.B) { benchmarkJoin(100000, t) } -func BenchmarkJoin_6(t *testing.B) { benchmarkJoin(1000000, t) } -func BenchmarkJoin_7(t *testing.B) { benchmarkJoin(10000000, t) } +func BenchmarkSplitJoin_2(t *testing.B) { benchmarkSplitJoin(100, t) } +func BenchmarkSplitJoin_3(t *testing.B) { benchmarkSplitJoin(1000, t) } +func BenchmarkSplitJoin_4(t *testing.B) { benchmarkSplitJoin(10000, t) } +func BenchmarkSplitJoin_5(t *testing.B) { benchmarkSplitJoin(100000, t) } +func BenchmarkSplitJoin_6(t *testing.B) { benchmarkSplitJoin(1000000, t) } +func BenchmarkSplitJoin_7(t *testing.B) { benchmarkSplitJoin(10000000, t) } -// func BenchmarkJoin_8(t *testing.B) { benchmarkJoin(100000000, t) } +// func BenchmarkSplitJoin_8(t *testing.B) { benchmarkJoin(100000000, t) } func BenchmarkSplitTreeSHA3_2(t *testing.B) { benchmarkSplitTreeSHA3(100, t) } func BenchmarkSplitTreeSHA3_2h(t *testing.B) { benchmarkSplitTreeSHA3(500, t) } @@ -538,14 +409,14 @@ func BenchmarkSplitPyramidBMT_7(t *testing.B) { benchmarkSplitPyramidBMT(100000 // func BenchmarkSplitPyramidBMT_8(t *testing.B) { benchmarkSplitPyramidBMT(100000000, t) } -func BenchmarkAppendPyramid_2(t *testing.B) { benchmarkAppendPyramid(100, 1000, t) } -func BenchmarkAppendPyramid_2h(t *testing.B) { benchmarkAppendPyramid(500, 1000, t) } -func BenchmarkAppendPyramid_3(t *testing.B) { benchmarkAppendPyramid(1000, 1000, t) } -func BenchmarkAppendPyramid_4(t *testing.B) { benchmarkAppendPyramid(10000, 1000, t) } -func BenchmarkAppendPyramid_4h(t *testing.B) { benchmarkAppendPyramid(50000, 1000, t) } -func BenchmarkAppendPyramid_5(t *testing.B) { benchmarkAppendPyramid(1000000, 1000, t) } -func BenchmarkAppendPyramid_6(t *testing.B) { benchmarkAppendPyramid(1000000, 1000, t) } -func BenchmarkAppendPyramid_7(t *testing.B) { benchmarkAppendPyramid(10000000, 1000, t) } +func BenchmarkSplitAppendPyramid_2(t *testing.B) { benchmarkSplitAppendPyramid(100, 1000, t) } +func BenchmarkSplitAppendPyramid_2h(t *testing.B) { benchmarkSplitAppendPyramid(500, 1000, t) } +func BenchmarkSplitAppendPyramid_3(t *testing.B) { benchmarkSplitAppendPyramid(1000, 1000, t) } +func BenchmarkSplitAppendPyramid_4(t *testing.B) { benchmarkSplitAppendPyramid(10000, 1000, t) } +func BenchmarkSplitAppendPyramid_4h(t *testing.B) { benchmarkSplitAppendPyramid(50000, 1000, t) } +func BenchmarkSplitAppendPyramid_5(t *testing.B) { benchmarkSplitAppendPyramid(1000000, 1000, t) } +func BenchmarkSplitAppendPyramid_6(t *testing.B) { benchmarkSplitAppendPyramid(1000000, 1000, t) } +func BenchmarkSplitAppendPyramid_7(t *testing.B) { benchmarkSplitAppendPyramid(10000000, 1000, t) } // func BenchmarkAppendPyramid_8(t *testing.B) { benchmarkAppendPyramid(100000000, 1000, t) } diff --git a/swarm/storage/chunkstore.go b/swarm/storage/chunkstore.go new file mode 100644 index 000000000000..57f4c11766b0 --- /dev/null +++ b/swarm/storage/chunkstore.go @@ -0,0 +1,67 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package storage + +import "sync" + +/* +ChunkStore interface is implemented by : + +- MemStore: a memory cache +- DbStore: local disk/db store +- LocalStore: a combination (sequence of) memStore and dbStore +- NetStore: cloud storage abstraction layer +- DPA: local requests for swarm storage and retrieval +- FakeChunkStore: dummy store which doesn't store anything just implements the interface +*/ +type ChunkStore interface { + Put(*Chunk) // effectively there is no error even if there is an error + Get(Key) (*Chunk, error) + Close() +} + +// MapChunkStore is a very simple ChunkStore implementation to store chunks in a map in memory. +type MapChunkStore struct { + chunks map[string]*Chunk + mu sync.RWMutex +} + +func NewMapChunkStore() *MapChunkStore { + return &MapChunkStore{ + chunks: make(map[string]*Chunk), + } +} + +func (m *MapChunkStore) Put(chunk *Chunk) { + m.mu.Lock() + defer m.mu.Unlock() + m.chunks[chunk.Key.Hex()] = chunk + chunk.markAsStored() +} + +func (m *MapChunkStore) Get(key Key) (*Chunk, error) { + m.mu.RLock() + defer m.mu.RUnlock() + chunk := m.chunks[key.Hex()] + if chunk == nil { + return nil, ErrChunkNotFound + } + return chunk, nil +} + +func (m *MapChunkStore) Close() { +} diff --git a/swarm/storage/dpa.go b/swarm/storage/dpa.go index ecf215964b51..bbdf71048346 100644 --- a/swarm/storage/dpa.go +++ b/swarm/storage/dpa.go @@ -18,12 +18,8 @@ package storage import ( "errors" - "fmt" "io" - "sync" "time" - - "github.com/ethereum/go-ethereum/log" ) /* @@ -39,12 +35,8 @@ implementation for storage or retrieval. */ const ( - storeChanCapacity = 100 - retrieveChanCapacity = 100 singletonSwarmDbCapacity = 50000 singletonSwarmCacheCapacity = 500 - maxStoreProcesses = 8 - maxRetrieveProcesses = 8 ) var ( @@ -56,14 +48,17 @@ var ( type DPA struct { ChunkStore - storeC chan *Chunk - retrieveC chan *Chunk - Chunker Chunker - - lock sync.Mutex - running bool - wg *sync.WaitGroup - quitC chan bool + hashFunc SwarmHasher +} + +type DPAParams struct { + Hash string +} + +func NewDPAParams() *DPAParams { + return &DPAParams{ + Hash: SHA3Hash, + } } // for testing locally @@ -79,14 +74,14 @@ func NewLocalDPA(datadir string, basekey []byte) (*DPA, error) { return NewDPA(&LocalStore{ memStore: NewMemStore(dbStore, singletonSwarmCacheCapacity), DbStore: dbStore, - }, NewChunkerParams()), nil + }, NewDPAParams()), nil } -func NewDPA(store ChunkStore, params *ChunkerParams) *DPA { - chunker := NewPyramidChunker(params) +func NewDPA(store ChunkStore, params *DPAParams) *DPA { + hashFunc := MakeHashFunc(params.Hash) return &DPA{ - Chunker: chunker, ChunkStore: store, + hashFunc: hashFunc, } } @@ -95,83 +90,13 @@ func NewDPA(store ChunkStore, params *ChunkerParams) *DPA { // Chunk retrieval blocks on netStore requests with a timeout so reader will // report error if retrieval of chunks within requested range time out. func (self *DPA) Retrieve(key Key) LazySectionReader { - return self.Chunker.Join(key, self.retrieveC, 0) + getter := NewHasherStore(self.ChunkStore, self.hashFunc, len(key) > self.hashFunc().Size()) + return TreeJoin(key, getter, 0) } // Public API. Main entry point for document storage directly. Used by the // FS-aware API and httpaccess -func (self *DPA) Store(data io.Reader, size int64) (key Key, wait func(), err error) { - return self.Chunker.Split(data, size, self.storeC) -} - -func (self *DPA) Start() { - self.lock.Lock() - defer self.lock.Unlock() - if self.running { - return - } - self.running = true - self.retrieveC = make(chan *Chunk, retrieveChanCapacity) - self.storeC = make(chan *Chunk, storeChanCapacity) - self.quitC = make(chan bool) - self.storeLoop() - self.retrieveLoop() -} - -func (self *DPA) Stop() { - self.lock.Lock() - defer self.lock.Unlock() - if !self.running { - return - } - self.running = false - close(self.quitC) -} - -// retrieveLoop dispatches the parallel chunk retrieval requests received on the -// retrieve channel to its ChunkStore (NetStore or LocalStore) -func (self *DPA) retrieveLoop() { - for i := 0; i < maxRetrieveProcesses; i++ { - go self.retrieveWorker() - } - log.Trace(fmt.Sprintf("dpa: retrieve loop spawning %v workers", maxRetrieveProcesses)) -} - -func (self *DPA) retrieveWorker() { - for chunk := range self.retrieveC { - storedChunk, err := self.Get(chunk.Key) - if err != nil { - log.Trace(fmt.Sprintf("error retrieving chunk %v: %v", chunk.Key.Log(), err)) - } else { - chunk.SData = storedChunk.SData - chunk.Size = storedChunk.Size - } - close(chunk.C) - - select { - case <-self.quitC: - return - default: - } - } -} - -// storeLoop dispatches the parallel chunk store request processors -// received on the store channel to its ChunkStore (NetStore or LocalStore) -func (self *DPA) storeLoop() { - for i := 0; i < maxStoreProcesses; i++ { - go self.storeWorker() - } - log.Trace(fmt.Sprintf("dpa: store spawning %v workers", maxStoreProcesses)) -} - -func (self *DPA) storeWorker() { - for chunk := range self.storeC { - self.Put(chunk) - select { - case <-self.quitC: - return - default: - } - } +func (self *DPA) Store(data io.Reader, size int64, toEncrypt bool) (key Key, wait func(), err error) { + putter := NewHasherStore(self.ChunkStore, self.hashFunc, toEncrypt) + return PyramidSplit(data, putter, putter) } diff --git a/swarm/storage/dpa_test.go b/swarm/storage/dpa_test.go index 8b486fee21cb..1126f05a52e5 100644 --- a/swarm/storage/dpa_test.go +++ b/swarm/storage/dpa_test.go @@ -27,6 +27,11 @@ import ( const testDataSize = 0x1000000 func TestDPArandom(t *testing.T) { + testDpaRandom(false, t) + testDpaRandom(true, t) +} + +func testDpaRandom(toEncrypt bool, t *testing.T) { tdb, err := newTestDbStore(false) if err != nil { t.Fatalf("init dbStore failed: %v", err) @@ -39,17 +44,12 @@ func TestDPArandom(t *testing.T) { memStore: memStore, DbStore: db, } - chunker := NewTreeChunker(NewChunkerParams()) - dpa := &DPA{ - Chunker: chunker, - ChunkStore: localStore, - } - dpa.Start() - defer dpa.Stop() + + dpa := NewDPA(localStore, NewDPAParams()) defer os.RemoveAll("/tmp/bzz") reader, slice := generateRandomData(testDataSize) - key, wait, err := dpa.Store(reader, testDataSize) + key, wait, err := dpa.Store(reader, testDataSize, toEncrypt) if err != nil { t.Errorf("Store error: %v", err) } @@ -86,6 +86,11 @@ func TestDPArandom(t *testing.T) { } func TestDPA_capacity(t *testing.T) { + testDPA_capacity(false, t) + testDPA_capacity(true, t) +} + +func testDPA_capacity(toEncrypt bool, t *testing.T) { tdb, err := newTestDbStore(false) if err != nil { t.Fatalf("init dbStore failed: %v", err) @@ -97,14 +102,9 @@ func TestDPA_capacity(t *testing.T) { memStore: memStore, DbStore: db, } - chunker := NewTreeChunker(NewChunkerParams()) - dpa := &DPA{ - Chunker: chunker, - ChunkStore: localStore, - } - dpa.Start() + dpa := NewDPA(localStore, NewDPAParams()) reader, slice := generateRandomData(testDataSize) - key, wait, err := dpa.Store(reader, testDataSize) + key, wait, err := dpa.Store(reader, testDataSize, toEncrypt) if err != nil { t.Errorf("Store error: %v", err) } diff --git a/swarm/storage/encryption.go b/swarm/storage/encryption.go deleted file mode 100644 index 909024b73238..000000000000 --- a/swarm/storage/encryption.go +++ /dev/null @@ -1,39 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package storage - -// -// import "github.com/ethereum/go-ethereum/swarm/storage/encryption" -// -// type HasherStore interface { -// Put([]byte) Key -// Get(Key) ([]byte, error) -// } -// -// type PlainHasherStore struct { -// store ChunkStore -// } -// -// type EncryptedHasherStore struct { -// PlainHasherStore -// dataEncryption encryption.Encryption -// spanEncryption encryption.Encryption -// } -// -// func (e *PlainHasherStore) Put([]byte) Key { -// -// } diff --git a/swarm/storage/encryption/encryption.go b/swarm/storage/encryption/encryption.go index 54b1ed7b342c..e50f2163db99 100644 --- a/swarm/storage/encryption/encryption.go +++ b/swarm/storage/encryption/encryption.go @@ -1,4 +1,4 @@ -// Copyright 2016 The go-ethereum Authors +// Copyright 2018 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify @@ -23,6 +23,8 @@ import ( "hash" ) +const KeyLength = 32 + type Key []byte type Encryption interface { @@ -100,6 +102,12 @@ func (e *encryption) transform(data []byte, key Key) []byte { return transformedData } +func GenerateRandomKey() (Key, error) { + key := make([]byte, KeyLength) + _, err := rand.Read(key) + return key, err +} + func min(x, y int) int { if x < y { return x diff --git a/swarm/storage/encryption/encryption_test.go b/swarm/storage/encryption/encryption_test.go index 697e39aaed65..5ea546d6bfc4 100644 --- a/swarm/storage/encryption/encryption_test.go +++ b/swarm/storage/encryption/encryption_test.go @@ -1,4 +1,4 @@ -// Copyright 2016 The go-ethereum Authors +// Copyright 2018 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify diff --git a/swarm/storage/hasherstore.go b/swarm/storage/hasherstore.go new file mode 100644 index 000000000000..a247816b7871 --- /dev/null +++ b/swarm/storage/hasherstore.go @@ -0,0 +1,229 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package storage + +import ( + "fmt" + "sync" + + "github.com/ethereum/go-ethereum/crypto/sha3" + "github.com/ethereum/go-ethereum/swarm/storage/encryption" +) + +type chunkEncryption struct { + spanEncryption encryption.Encryption + dataEncryption encryption.Encryption +} + +type hasherStore struct { + store ChunkStore + hashFunc SwarmHasher + chunkEncryption *chunkEncryption + hashSize int // content hash size + refSize int64 // reference size (content hash + possibly encryption key) + wg *sync.WaitGroup + closed chan struct{} +} + +func newChunkEncryption(chunkSize, refSize int64) *chunkEncryption { + return &chunkEncryption{ + spanEncryption: encryption.New(0, uint32(chunkSize/refSize), sha3.NewKeccak256), + dataEncryption: encryption.New(int(chunkSize), 0, sha3.NewKeccak256), + } +} + +// NewHasherStore creates a hasherStore object, which implements Putter and Getter interfaces. +// With the HasherStore you can put and get chunk data (which is just []byte) into a ChunkStore +// and the hasherStore will take core of encryption/decryption of data if necessary +func NewHasherStore(chunkStore ChunkStore, hashFunc SwarmHasher, toEncrypt bool) *hasherStore { + var chunkEncryption *chunkEncryption + + hashSize := hashFunc().Size() + refSize := int64(hashSize) + if toEncrypt { + refSize += encryption.KeyLength + chunkEncryption = newChunkEncryption(DefaultChunkSize, refSize) + } + + return &hasherStore{ + store: chunkStore, + hashFunc: hashFunc, + chunkEncryption: chunkEncryption, + hashSize: hashSize, + refSize: refSize, + wg: &sync.WaitGroup{}, + closed: make(chan struct{}), + } +} + +// Put stores the chunkData into the ChunkStore of the hasherStore and returns the reference. +// If hasherStore has a chunkEncryption object, the data will be encrypted. +// Asynchronous function, the data will not necessarily be stored when it returns. +func (h *hasherStore) Put(chunkData ChunkData) (Reference, error) { + c := chunkData + size := chunkData.Size() + var encryptionKey encryption.Key + if h.chunkEncryption != nil { + var err error + c, encryptionKey, err = h.encryptChunkData(chunkData) + if err != nil { + return nil, err + } + } + chunk := h.createChunk(c, size) + + h.storeChunk(chunk) + + return Reference(append(chunk.Key, encryptionKey...)), nil +} + +// Get returns data of the chunk with the given reference (retrieved from the ChunkStore of hasherStore). +// If the data is encrypted and the reference contains an encryption key, it will be decrypted before +// return. +func (h *hasherStore) Get(ref Reference) (ChunkData, error) { + key, encryptionKey, err := parseReference(ref, int(h.hashSize)) + if err != nil { + return nil, err + } + toDecrypt := (encryptionKey != nil) + + chunk, err := h.store.Get(key) + if err != nil { + return nil, err + } + + chunkData := chunk.SData + if toDecrypt { + var err error + chunkData, err = h.decryptChunkData(chunkData, encryptionKey) + if err != nil { + return nil, err + } + } + return chunkData, nil +} + +// Close indicates that no more chunks will be put with the hasherStore, so the Wait +// function can return when all the previously put chunks has been stored. +func (h *hasherStore) Close() { + close(h.closed) +} + +// Wait returns when +// 1) the Close() function has been called and +// 2) all the chunks which has been Put has been stored +func (h *hasherStore) Wait() { + <-h.closed + h.wg.Wait() +} + +func (h *hasherStore) createHash(chunkData ChunkData) Key { + hasher := h.hashFunc() + hasher.ResetWithLength(chunkData[:8]) // 8 bytes of length + hasher.Write(chunkData[8:]) // minus 8 []byte length + return hasher.Sum(nil) +} + +func (h *hasherStore) createChunk(chunkData ChunkData, chunkSize int64) *Chunk { + hash := h.createHash(chunkData) + chunk := NewChunk(hash, nil) + chunk.SData = chunkData + chunk.Size = chunkSize + + return chunk +} + +func (p *hasherStore) encryptChunkData(chunkData ChunkData) (ChunkData, encryption.Key, error) { + if len(chunkData) < 8 { + return nil, nil, fmt.Errorf("Invalid ChunkData, min length 8 got %v", len(chunkData)) + } + + encryptionKey, err := encryption.GenerateRandomKey() + if err != nil { + return nil, nil, err + } + + encryptedSpan, err := p.chunkEncryption.spanEncryption.Encrypt(chunkData[:8], encryptionKey) + if err != nil { + return nil, nil, err + } + encryptedData, err := p.chunkEncryption.dataEncryption.Encrypt(chunkData[8:], encryptionKey) + if err != nil { + return nil, nil, err + } + c := make(ChunkData, len(encryptedSpan)+len(encryptedData)) + copy(c[:8], encryptedSpan) + copy(c[8:], encryptedData) + return c, encryptionKey, nil +} + +func (h *hasherStore) decryptChunkData(chunkData ChunkData, encryptionKey encryption.Key) (ChunkData, error) { + if len(chunkData) < 8 { + return nil, fmt.Errorf("Invalid ChunkData, min length 8 got %v", len(chunkData)) + } + + decryptedSpan, err := h.chunkEncryption.spanEncryption.Decrypt(chunkData[:8], encryptionKey) + if err != nil { + return nil, err + } + + decryptedData, err := h.chunkEncryption.dataEncryption.Decrypt(chunkData[8:], encryptionKey) + if err != nil { + return nil, err + } + + // removing extra bytes which were just added for padding + length := ChunkData(decryptedSpan).Size() + for length > DefaultChunkSize { + length = length + (DefaultChunkSize - 1) + length = length / DefaultChunkSize + length *= h.refSize + } + + c := make(ChunkData, length+8) + copy(c[:8], decryptedSpan) + copy(c[8:], decryptedData[:length]) + + return c[:length+8], nil +} + +func (h *hasherStore) RefSize() int64 { + return h.refSize +} + +func (h *hasherStore) storeChunk(chunk *Chunk) { + h.wg.Add(1) + go func() { + <-chunk.dbStoredC + h.wg.Done() + }() + h.store.Put(chunk) +} + +func parseReference(ref Reference, hashSize int) (Key, encryption.Key, error) { + encryptedKeyLength := hashSize + encryption.KeyLength + switch len(ref) { + case KeyLength: + return Key(ref), nil, nil + case encryptedKeyLength: + encKeyIdx := len(ref) - encryption.KeyLength + return Key(ref[:encKeyIdx]), encryption.Key(ref[encKeyIdx:]), nil + default: + return nil, nil, fmt.Errorf("Invalid reference length, expected %v or %v got %v", hashSize, encryptedKeyLength, len(ref)) + } + +} diff --git a/swarm/storage/hasherstore_test.go b/swarm/storage/hasherstore_test.go new file mode 100644 index 000000000000..bb415826ab9e --- /dev/null +++ b/swarm/storage/hasherstore_test.go @@ -0,0 +1,128 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package storage + +import ( + "bytes" + "crypto/rand" + "encoding/binary" + "testing" + + "github.com/ethereum/go-ethereum/swarm/storage/encryption" + + "github.com/ethereum/go-ethereum/common" +) + +func TestHasherStore(t *testing.T) { + var tests = []struct { + chunkLength int + toEncrypt bool + }{ + {10, false}, + {100, false}, + {1000, false}, + {4096, false}, + {10, true}, + {100, true}, + {1000, true}, + {4096, true}, + } + + for _, tt := range tests { + chunkStore := NewMapChunkStore() + hasherStore := NewHasherStore(chunkStore, MakeHashFunc(SHA3Hash), tt.toEncrypt) + + // Put two random chunks into the hasherStore + chunkData1 := generateRandomChunkData(tt.chunkLength) + key1, err := hasherStore.Put(chunkData1) + if err != nil { + t.Fatalf("Expected no error got \"%v\"", err) + } + + chunkData2 := generateRandomChunkData(tt.chunkLength) + key2, err := hasherStore.Put(chunkData2) + if err != nil { + t.Fatalf("Expected no error got \"%v\"", err) + } + + hasherStore.Close() + + // Wait until chunks are really stored + hasherStore.Wait() + + // Get the first chunk + retrievedChunkData1, err := hasherStore.Get(key1) + if err != nil { + t.Fatalf("Expected no error, got \"%v\"", err) + } + + // Retrieved data should be same as the original + if !bytes.Equal(chunkData1, retrievedChunkData1) { + t.Fatalf("Expected retrieved chunk data %v, got %v", common.Bytes2Hex(chunkData1), common.Bytes2Hex(retrievedChunkData1)) + } + + // Get the second chunk + retrievedChunkData2, err := hasherStore.Get(key2) + if err != nil { + t.Fatalf("Expected no error, got \"%v\"", err) + } + + // Retrieved data should be same as the original + if !bytes.Equal(chunkData2, retrievedChunkData2) { + t.Fatalf("Expected retrieved chunk data %v, got %v", common.Bytes2Hex(chunkData2), common.Bytes2Hex(retrievedChunkData2)) + } + + hash1, encryptionKey1, err := parseReference(key1, hasherStore.hashSize) + if err != nil { + t.Fatalf("Expected no error, got \"%v\"", err) + } + + if tt.toEncrypt { + if encryptionKey1 == nil { + t.Fatal("Expected non-nil encryption key, got nil") + } else if len(encryptionKey1) != encryption.KeyLength { + t.Fatalf("Expected encryption key length %v, got %v", encryption.KeyLength, len(encryptionKey1)) + } + } + if !tt.toEncrypt && encryptionKey1 != nil { + t.Fatalf("Expected nil encryption key, got key with length %v", len(encryptionKey1)) + } + + // Check if chunk data in store is encrypted or not + chunkInStore, err := chunkStore.Get(hash1) + if err != nil { + t.Fatalf("Expected no error got \"%v\"", err) + } + + chunkDataInStore := chunkInStore.SData + + if tt.toEncrypt && bytes.Equal(chunkData1, chunkDataInStore) { + t.Fatalf("Chunk expected to be encrypted but it is stored without encryption") + } + if !tt.toEncrypt && !bytes.Equal(chunkData1, chunkDataInStore) { + t.Fatalf("Chunk expected to be not encrypted but stored content is different. Expected %v got %v", common.Bytes2Hex(chunkData1), common.Bytes2Hex(chunkDataInStore)) + } + } +} + +func generateRandomChunkData(length int) ChunkData { + chunkData := make([]byte, length) + + rand.Read(chunkData) + binary.LittleEndian.PutUint64(chunkData[:8], uint64(len(chunkData)-8)) + return chunkData +} diff --git a/swarm/storage/ldbstore_test.go b/swarm/storage/ldbstore_test.go index d7a6d78f8d83..faa0472a3341 100644 --- a/swarm/storage/ldbstore_test.go +++ b/swarm/storage/ldbstore_test.go @@ -172,7 +172,7 @@ func testIterator(t *testing.T, mock bool) { } defer db.close() - FakeChunk(getDefaultChunkSize(), chunkcount, chunks) + FakeChunk(DefaultChunkSize, chunkcount, chunks) wg := &sync.WaitGroup{} wg.Add(len(chunks)) diff --git a/swarm/storage/pyramid.go b/swarm/storage/pyramid.go index edb784df34b0..81700409a904 100644 --- a/swarm/storage/pyramid.go +++ b/swarm/storage/pyramid.go @@ -65,9 +65,8 @@ var ( ) const ( - ChunkProcessors = 8 - DefaultBranches int64 = 128 - splitTimeout = time.Minute * 5 + ChunkProcessors = 8 + splitTimeout = time.Minute * 5 ) const ( @@ -75,18 +74,39 @@ const ( TreeChunk = 1 ) -type ChunkerParams struct { - Branches int64 - Hash string +type PyramidSplitterParams struct { + SplitterParams + getter Getter } -func NewChunkerParams() *ChunkerParams { - return &ChunkerParams{ - Branches: DefaultBranches, - Hash: SHA3Hash, +func NewPyramidSplitterParams(key Key, reader io.Reader, putter Putter, getter Getter, chunkSize int64) *PyramidSplitterParams { + hashSize := putter.RefSize() + return &PyramidSplitterParams{ + SplitterParams: SplitterParams{ + ChunkerParams: ChunkerParams{ + chunkSize: chunkSize, + hashSize: hashSize, + }, + reader: reader, + putter: putter, + key: key, + }, + getter: getter, } } +/* + When splitting, data is given as a SectionReader, and the key is a hashSize long byte slice (Key), the root hash of the entire content will fill this once processing finishes. + New chunks to store are store using the putter which the caller provides. +*/ +func PyramidSplit(reader io.Reader, putter Putter, getter Getter) (Key, func(), error) { + return NewPyramidSplitter(NewPyramidSplitterParams(nil, reader, putter, getter, DefaultChunkSize)).Split() +} + +func PyramidAppend(key Key, reader io.Reader, putter Putter, getter Getter) (Key, func(), error) { + return NewPyramidSplitter(NewPyramidSplitterParams(key, reader, putter, getter, DefaultChunkSize)).Append() +} + // Entry to create a tree node type TreeEntry struct { level int @@ -112,41 +132,56 @@ func NewTreeEntry(pyramid *PyramidChunker) *TreeEntry { // Used by the hash processor to create a data/tree chunk and send to storage type chunkJob struct { - key Key - chunk []byte - size int64 - parentWg *sync.WaitGroup - chunkType int // used to identify the tree related chunks for debugging - chunkLvl int // leaf-1 is level 0 and goes upwards until it reaches root + key Key + chunk []byte + parentWg *sync.WaitGroup } type PyramidChunker struct { - hashFunc SwarmHasher chunkSize int64 hashSize int64 branches int64 + reader io.Reader + putter Putter + getter Getter + key Key workerCount int64 workerLock sync.RWMutex + jobC chan *chunkJob + wg *sync.WaitGroup + errC chan error + quitC chan bool + rootKey []byte + chunkLevel [][]*TreeEntry } -func NewPyramidChunker(params *ChunkerParams) (self *PyramidChunker) { +func NewPyramidSplitter(params *PyramidSplitterParams) (self *PyramidChunker) { self = &PyramidChunker{} - self.hashFunc = MakeHashFunc(params.Hash) - self.branches = params.Branches - self.hashSize = int64(self.hashFunc().Size()) + self.reader = params.reader + self.hashSize = params.hashSize + self.branches = params.chunkSize / self.hashSize self.chunkSize = self.hashSize * self.branches + self.putter = params.putter + self.getter = params.getter + self.key = params.key self.workerCount = 0 + self.jobC = make(chan *chunkJob, 2*ChunkProcessors) + self.wg = &sync.WaitGroup{} + self.errC = make(chan error) + self.quitC = make(chan bool) + self.rootKey = make([]byte, self.hashSize) + self.chunkLevel = make([][]*TreeEntry, self.branches) return } -func (self *PyramidChunker) Join(key Key, chunkC chan *Chunk, depth int) LazySectionReader { +func (self *PyramidChunker) Join(key Key, getter Getter, depth int) LazySectionReader { return &LazyChunkReader{ key: key, - chunkC: chunkC, depth: depth, chunkSize: self.chunkSize, branches: self.branches, hashSize: self.hashSize, + getter: getter, } } @@ -168,204 +203,179 @@ func (self *PyramidChunker) decrementWorkerCount() { self.workerCount -= 1 } -func (self *PyramidChunker) Split(data io.Reader, size int64, chunkC chan *Chunk) (k Key, wait func(), err error) { - log.Debug("pyramid.chunker: Split()", "size", size) - jobC := make(chan *chunkJob, 2*ChunkProcessors) - wg := &sync.WaitGroup{} - storageWG := &sync.WaitGroup{} - storageWG.Add(1) - errC := make(chan error) - quitC := make(chan bool) - rootKey := make([]byte, self.hashSize) - chunkLevel := make([][]*TreeEntry, self.branches) +func (self *PyramidChunker) Split() (k Key, wait func(), err error) { + log.Debug("pyramid.chunker: Split()") - wg.Add(1) - self.prepareChunks(false, chunkLevel, data, rootKey, quitC, wg, jobC, chunkC, errC, storageWG) + self.wg.Add(1) + self.prepareChunks(false) // closes internal error channel if all subprocesses in the workgroup finished go func() { // waiting for all chunks to finish - wg.Wait() + self.wg.Wait() //We close errC here because this is passed down to 8 parallel routines underneath. // if a error happens in one of them.. that particular routine raises error... // once they all complete successfully, the control comes back and we can safely close this here. - close(errC) + close(self.errC) }() - defer close(quitC) + defer close(self.quitC) + defer self.putter.Close() select { - case err := <-errC: + case err := <-self.errC: if err != nil { return nil, nil, err } case <-time.NewTimer(splitTimeout).C: } - return rootKey, storageWG.Wait, nil + return self.rootKey, self.putter.Wait, nil + } -func (self *PyramidChunker) Append(key Key, data io.Reader, chunkC chan *Chunk) (k Key, wait func(), err error) { +func (self *PyramidChunker) Append() (k Key, wait func(), err error) { log.Debug("pyramid.chunker: Append()") - quitC := make(chan bool) - rootKey := make([]byte, self.hashSize) - chunkLevel := make([][]*TreeEntry, self.branches) - // Load the right most unfinished tree chunks in every level - self.loadTree(chunkLevel, key, chunkC, quitC) + self.loadTree() - jobC := make(chan *chunkJob, 2*ChunkProcessors) - wg := &sync.WaitGroup{} - errC := make(chan error) - - storageWG := &sync.WaitGroup{} - storageWG.Add(1) - - wg.Add(1) - self.prepareChunks(true, chunkLevel, data, rootKey, quitC, wg, jobC, chunkC, errC, storageWG) + self.wg.Add(1) + self.prepareChunks(true) // closes internal error channel if all subprocesses in the workgroup finished go func() { // waiting for all chunks to finish - wg.Wait() + self.wg.Wait() - close(errC) + close(self.errC) }() - defer close(quitC) + defer close(self.quitC) + defer self.putter.Close() select { - case err := <-errC: + case err := <-self.errC: if err != nil { return nil, nil, err } case <-time.NewTimer(splitTimeout).C: } - return rootKey, storageWG.Wait, nil + + return self.rootKey, self.putter.Wait, nil } -func (self *PyramidChunker) processor(id int64, jobC chan *chunkJob, chunkC chan *Chunk, errC chan error, quitC chan bool, storageWG *sync.WaitGroup) { +func (self *PyramidChunker) processor(id int64) { defer self.decrementWorkerCount() - defer storageWG.Done() - hasher := self.hashFunc() for { select { - case job, ok := <-jobC: + case job, ok := <-self.jobC: if !ok { return } - self.processChunk(id, hasher, job, chunkC, storageWG) - case <-quitC: + self.processChunk(id, job) + case <-self.quitC: return } } } -func (self *PyramidChunker) processChunk(id int64, hasher SwarmHash, job *chunkJob, chunkC chan *Chunk, storageWG *sync.WaitGroup) { - hasher.ResetWithLength(job.chunk[:8]) // 8 bytes of length - hasher.Write(job.chunk[8:]) // minus 8 []byte length - h := hasher.Sum(nil) +func (self *PyramidChunker) processChunk(id int64, job *chunkJob) { + log.Debug("pyramid.chunker: processChunk()", "id", id) - newChunk := NewChunk(h, nil) - newChunk.SData = job.chunk - newChunk.Size = job.size + ref, err := self.putter.Put(job.chunk) + if err != nil { + self.errC <- err + } // report hash of this chunk one level up (keys corresponds to the proper subslice of the parent chunk) - copy(job.key, h) + copy(job.key, ref) // send off new chunk to storage job.parentWg.Done() - - if chunkC != nil { - chunkC <- newChunk - storageWG.Add(1) - go func() { - defer storageWG.Done() - <-newChunk.dbStoredC - }() - } } -func (self *PyramidChunker) loadTree(chunkLevel [][]*TreeEntry, key Key, chunkC chan *Chunk, quitC chan bool) error { +func (self *PyramidChunker) loadTree() error { log.Debug("pyramid.chunker: loadTree()") // Get the root chunk to get the total size - chunk := retrieve(key, chunkC, quitC) - if chunk == nil { + chunkData, err := self.getter.Get(Reference(self.key)) + if err != nil { return errLoadingTreeRootChunk } - log.Trace("pyramid.chunker: root chunk", "chunk.Size", chunk.Size, "self.chunkSize", self.chunkSize) + chunkSize := chunkData.Size() + log.Trace("pyramid.chunker: root chunk", "chunk.Size", chunkSize, "self.chunkSize", self.chunkSize) //if data size is less than a chunk... add a parent with update as pending - if chunk.Size <= self.chunkSize { + if chunkSize <= self.chunkSize { newEntry := &TreeEntry{ level: 0, branchCount: 1, - subtreeSize: uint64(chunk.Size), + subtreeSize: uint64(chunkSize), chunk: make([]byte, self.chunkSize+8), key: make([]byte, self.hashSize), index: 0, updatePending: true, } - copy(newEntry.chunk[8:], chunk.Key) - chunkLevel[0] = append(chunkLevel[0], newEntry) + copy(newEntry.chunk[8:], self.key) + self.chunkLevel[0] = append(self.chunkLevel[0], newEntry) return nil } var treeSize int64 var depth int treeSize = self.chunkSize - for ; treeSize < chunk.Size; treeSize *= self.branches { + for ; treeSize < chunkSize; treeSize *= self.branches { depth++ } log.Trace("pyramid.chunker", "depth", depth) // Add the root chunk entry - branchCount := int64(len(chunk.SData)-8) / self.hashSize + branchCount := int64(len(chunkData)-8) / self.hashSize newEntry := &TreeEntry{ level: depth - 1, branchCount: branchCount, - subtreeSize: uint64(chunk.Size), - chunk: chunk.SData, - key: key, + subtreeSize: uint64(chunkSize), + chunk: chunkData, + key: self.key, index: 0, updatePending: true, } - chunkLevel[depth-1] = append(chunkLevel[depth-1], newEntry) + self.chunkLevel[depth-1] = append(self.chunkLevel[depth-1], newEntry) // Add the rest of the tree for lvl := depth - 1; lvl >= 1; lvl-- { //TODO(jmozah): instead of loading finished branches and then trim in the end, //avoid loading them in the first place - for _, ent := range chunkLevel[lvl] { + for _, ent := range self.chunkLevel[lvl] { branchCount = int64(len(ent.chunk)-8) / self.hashSize for i := int64(0); i < branchCount; i++ { key := ent.chunk[8+(i*self.hashSize) : 8+((i+1)*self.hashSize)] - newChunk := retrieve(key, chunkC, quitC) - if newChunk == nil { + newChunkData, err := self.getter.Get(Reference(key)) + if err != nil { return errLoadingTreeChunk } - bewBranchCount := int64(len(newChunk.SData)-8) / self.hashSize + newChunkSize := newChunkData.Size() + bewBranchCount := int64(len(newChunkData)-8) / self.hashSize newEntry := &TreeEntry{ level: lvl - 1, branchCount: bewBranchCount, - subtreeSize: uint64(newChunk.Size), - chunk: newChunk.SData, + subtreeSize: uint64(newChunkSize), + chunk: newChunkData, key: key, index: 0, updatePending: true, } - chunkLevel[lvl-1] = append(chunkLevel[lvl-1], newEntry) + self.chunkLevel[lvl-1] = append(self.chunkLevel[lvl-1], newEntry) } // We need to get only the right most unfinished branch.. so trim all finished branches - if int64(len(chunkLevel[lvl-1])) >= self.branches { - chunkLevel[lvl-1] = nil + if int64(len(self.chunkLevel[lvl-1])) >= self.branches { + self.chunkLevel[lvl-1] = nil } } } @@ -373,22 +383,23 @@ func (self *PyramidChunker) loadTree(chunkLevel [][]*TreeEntry, key Key, chunkC return nil } -func (self *PyramidChunker) prepareChunks(isAppend bool, chunkLevel [][]*TreeEntry, data io.Reader, rootKey []byte, quitC chan bool, wg *sync.WaitGroup, jobC chan *chunkJob, chunkC chan *Chunk, errC chan error, storageWG *sync.WaitGroup) { +func (self *PyramidChunker) prepareChunks(isAppend bool) { log.Debug("pyramid.chunker: prepareChunks", "isAppend", isAppend) - defer wg.Done() + defer self.wg.Done() chunkWG := &sync.WaitGroup{} self.incrementWorkerCount() - go self.processor(self.workerCount, jobC, chunkC, errC, quitC, storageWG) + go self.processor(self.workerCount) parent := NewTreeEntry(self) - var unfinishedChunk *Chunk + var unfinishedChunkData ChunkData + var unfinishedChunkSize int64 - if isAppend && len(chunkLevel[0]) != 0 { - lastIndex := len(chunkLevel[0]) - 1 - ent := chunkLevel[0][lastIndex] + if isAppend && len(self.chunkLevel[0]) != 0 { + lastIndex := len(self.chunkLevel[0]) - 1 + ent := self.chunkLevel[0][lastIndex] if ent.branchCount < self.branches { parent = &TreeEntry{ @@ -404,12 +415,17 @@ func (self *PyramidChunker) prepareChunks(isAppend bool, chunkLevel [][]*TreeEnt lastBranch := parent.branchCount - 1 lastKey := parent.chunk[8+lastBranch*self.hashSize : 8+(lastBranch+1)*self.hashSize] - unfinishedChunk = retrieve(lastKey, chunkC, quitC) - if unfinishedChunk.Size < self.chunkSize { - parent.subtreeSize = parent.subtreeSize - uint64(unfinishedChunk.Size) + var err error + unfinishedChunkData, err = self.getter.Get(lastKey) + if err != nil { + self.errC <- err + } + unfinishedChunkSize = unfinishedChunkData.Size() + if unfinishedChunkSize < self.chunkSize { + parent.subtreeSize = parent.subtreeSize - uint64(unfinishedChunkSize) parent.branchCount = parent.branchCount - 1 } else { - unfinishedChunk = nil + unfinishedChunkData = nil } } } @@ -420,15 +436,15 @@ func (self *PyramidChunker) prepareChunks(isAppend bool, chunkLevel [][]*TreeEnt var readBytes int - if unfinishedChunk != nil { - copy(chunkData, unfinishedChunk.SData) - readBytes += int(unfinishedChunk.Size) - unfinishedChunk = nil + if unfinishedChunkData != nil { + copy(chunkData, unfinishedChunkData) + readBytes += int(unfinishedChunkSize) + unfinishedChunkData = nil log.Trace("pyramid.chunker: found unfinished chunk", "readBytes", readBytes) } var res []byte - res, err = ioutil.ReadAll(io.LimitReader(data, int64(len(chunkData)-(8+readBytes)))) + res, err = ioutil.ReadAll(io.LimitReader(self.reader, int64(len(chunkData)-(8+readBytes)))) // hack for ioutil.ReadAll: // a successful call to ioutil.ReadAll returns err == nil, not err == EOF, whereas we @@ -447,21 +463,21 @@ func (self *PyramidChunker) prepareChunks(isAppend bool, chunkLevel [][]*TreeEnt // Data is exactly one chunk.. pick the last chunk key as root chunkWG.Wait() lastChunksKey := parent.chunk[8 : 8+self.hashSize] - copy(rootKey, lastChunksKey) + copy(self.rootKey, lastChunksKey) break } } else { - close(quitC) + close(self.quitC) break } } // Data ended in chunk boundary.. just signal to start bulding tree if readBytes == 0 { - self.buildTree(isAppend, chunkLevel, parent, chunkWG, jobC, quitC, true, rootKey) + self.buildTree(isAppend, parent, chunkWG, true) break } else { - pkey := self.enqueueDataChunk(chunkData, uint64(readBytes), parent, chunkWG, jobC, quitC) + pkey := self.enqueueDataChunk(chunkData, uint64(readBytes), parent, chunkWG) // update tree related parent data structures parent.subtreeSize += uint64(readBytes) @@ -473,40 +489,39 @@ func (self *PyramidChunker) prepareChunks(isAppend bool, chunkLevel [][]*TreeEnt // only one data chunk .. so dont add any parent chunk if parent.branchCount <= 1 { chunkWG.Wait() - copy(rootKey, pkey) + copy(self.rootKey, pkey) break } - self.buildTree(isAppend, chunkLevel, parent, chunkWG, jobC, quitC, true, rootKey) + self.buildTree(isAppend, parent, chunkWG, true) break } if parent.branchCount == self.branches { - self.buildTree(isAppend, chunkLevel, parent, chunkWG, jobC, quitC, false, rootKey) + self.buildTree(isAppend, parent, chunkWG, false) parent = NewTreeEntry(self) } } workers := self.getWorkerCount() - if int64(len(jobC)) > workers && workers < ChunkProcessors { + if int64(len(self.jobC)) > workers && workers < ChunkProcessors { self.incrementWorkerCount() - storageWG.Add(1) - go self.processor(self.workerCount, jobC, chunkC, errC, quitC, storageWG) + go self.processor(self.workerCount) } } } -func (self *PyramidChunker) buildTree(isAppend bool, chunkLevel [][]*TreeEntry, ent *TreeEntry, chunkWG *sync.WaitGroup, jobC chan *chunkJob, quitC chan bool, last bool, rootKey []byte) { +func (self *PyramidChunker) buildTree(isAppend bool, ent *TreeEntry, chunkWG *sync.WaitGroup, last bool) { chunkWG.Wait() - self.enqueueTreeChunk(chunkLevel, ent, chunkWG, jobC, quitC, last) + self.enqueueTreeChunk(ent, chunkWG, last) compress := false endLvl := self.branches for lvl := int64(0); lvl < self.branches; lvl++ { - lvlCount := int64(len(chunkLevel[lvl])) + lvlCount := int64(len(self.chunkLevel[lvl])) if lvlCount >= self.branches { endLvl = lvl + 1 compress = true @@ -523,9 +538,9 @@ func (self *PyramidChunker) buildTree(isAppend bool, chunkLevel [][]*TreeEntry, for lvl := int64(ent.level); lvl < endLvl; lvl++ { - lvlCount := int64(len(chunkLevel[lvl])) + lvlCount := int64(len(self.chunkLevel[lvl])) if lvlCount == 1 && last { - copy(rootKey, chunkLevel[lvl][0].key) + copy(self.rootKey, self.chunkLevel[lvl][0].key) return } @@ -538,9 +553,9 @@ func (self *PyramidChunker) buildTree(isAppend bool, chunkLevel [][]*TreeEntry, var nextLvlCount int64 var tempEntry *TreeEntry - if len(chunkLevel[lvl+1]) > 0 { - nextLvlCount = int64(len(chunkLevel[lvl+1]) - 1) - tempEntry = chunkLevel[lvl+1][nextLvlCount] + if len(self.chunkLevel[lvl+1]) > 0 { + nextLvlCount = int64(len(self.chunkLevel[lvl+1]) - 1) + tempEntry = self.chunkLevel[lvl+1][nextLvlCount] } if isAppend && tempEntry != nil && tempEntry.updatePending { updateEntry := &TreeEntry{ @@ -554,11 +569,11 @@ func (self *PyramidChunker) buildTree(isAppend bool, chunkLevel [][]*TreeEntry, } for index := int64(0); index < lvlCount; index++ { updateEntry.branchCount++ - updateEntry.subtreeSize += chunkLevel[lvl][index].subtreeSize - copy(updateEntry.chunk[8+(index*self.hashSize):8+((index+1)*self.hashSize)], chunkLevel[lvl][index].key[:self.hashSize]) + updateEntry.subtreeSize += self.chunkLevel[lvl][index].subtreeSize + copy(updateEntry.chunk[8+(index*self.hashSize):8+((index+1)*self.hashSize)], self.chunkLevel[lvl][index].key[:self.hashSize]) } - self.enqueueTreeChunk(chunkLevel, updateEntry, chunkWG, jobC, quitC, last) + self.enqueueTreeChunk(updateEntry, chunkWG, last) } else { @@ -575,13 +590,13 @@ func (self *PyramidChunker) buildTree(isAppend bool, chunkLevel [][]*TreeEntry, index := int64(0) for i := startCount; i < endCount; i++ { - entry := chunkLevel[lvl][i] + entry := self.chunkLevel[lvl][i] newEntry.subtreeSize += entry.subtreeSize copy(newEntry.chunk[8+(index*self.hashSize):8+((index+1)*self.hashSize)], entry.key[:self.hashSize]) index++ } - self.enqueueTreeChunk(chunkLevel, newEntry, chunkWG, jobC, quitC, last) + self.enqueueTreeChunk(newEntry, chunkWG, last) } @@ -590,15 +605,15 @@ func (self *PyramidChunker) buildTree(isAppend bool, chunkLevel [][]*TreeEntry, if !isAppend { chunkWG.Wait() if compress { - chunkLevel[lvl] = nil + self.chunkLevel[lvl] = nil } } } } -func (self *PyramidChunker) enqueueTreeChunk(chunkLevel [][]*TreeEntry, ent *TreeEntry, chunkWG *sync.WaitGroup, jobC chan *chunkJob, quitC chan bool, last bool) { - if ent != nil { +func (self *PyramidChunker) enqueueTreeChunk(ent *TreeEntry, chunkWG *sync.WaitGroup, last bool) { + if ent != nil && ent.branchCount > 0 { // wait for data chunks to get over before processing the tree chunk if last { @@ -609,29 +624,29 @@ func (self *PyramidChunker) enqueueTreeChunk(chunkLevel [][]*TreeEntry, ent *Tre ent.key = make([]byte, self.hashSize) chunkWG.Add(1) select { - case jobC <- &chunkJob{ent.key, ent.chunk[:ent.branchCount*self.hashSize+8], int64(ent.subtreeSize), chunkWG, TreeChunk, 0}: - case <-quitC: + case self.jobC <- &chunkJob{ent.key, ent.chunk[:ent.branchCount*self.hashSize+8], chunkWG}: + case <-self.quitC: } // Update or append based on weather it is a new entry or being reused if ent.updatePending { chunkWG.Wait() - chunkLevel[ent.level][ent.index] = ent + self.chunkLevel[ent.level][ent.index] = ent } else { - chunkLevel[ent.level] = append(chunkLevel[ent.level], ent) + self.chunkLevel[ent.level] = append(self.chunkLevel[ent.level], ent) } } } -func (self *PyramidChunker) enqueueDataChunk(chunkData []byte, size uint64, parent *TreeEntry, chunkWG *sync.WaitGroup, jobC chan *chunkJob, quitC chan bool) Key { +func (self *PyramidChunker) enqueueDataChunk(chunkData []byte, size uint64, parent *TreeEntry, chunkWG *sync.WaitGroup) Key { binary.LittleEndian.PutUint64(chunkData[:8], size) pkey := parent.chunk[8+parent.branchCount*self.hashSize : 8+(parent.branchCount+1)*self.hashSize] chunkWG.Add(1) select { - case jobC <- &chunkJob{pkey, chunkData[:size+8], int64(size), chunkWG, DataChunk, -1}: - case <-quitC: + case self.jobC <- &chunkJob{pkey, chunkData[:size+8], chunkWG}: + case <-self.quitC: } return pkey diff --git a/swarm/storage/types.go b/swarm/storage/types.go index f0bc49a0500c..b655fe0e2513 100644 --- a/swarm/storage/types.go +++ b/swarm/storage/types.go @@ -32,6 +32,7 @@ import ( ) const MaxPO = 7 +const KeyLength = 32 type Hasher func() hash.Hash type SwarmHasher func() SwarmHash @@ -223,9 +224,8 @@ func (c *Chunk) WaitToStore() { func FakeChunk(size int64, count int, chunks []*Chunk) int { var i int hasher := MakeHashFunc(SHA3Hash)() - chunksize := getDefaultChunkSize() - if size > chunksize { - size = chunksize + if size > DefaultChunkSize { + size = DefaultChunkSize } for i = 0; i < count; i++ { @@ -241,79 +241,6 @@ func FakeChunk(size int64, count int, chunks []*Chunk) int { return i } -func getDefaultChunkSize() int64 { - return DefaultBranches * int64(MakeHashFunc(SHA3Hash)().Size()) - -} - -/* -The ChunkStore interface is implemented by : - -- MemStore: a memory cache -- DbStore: local disk/db store -- LocalStore: a combination (sequence of) memStore and dbStore -- NetStore: cloud storage abstraction layer -- DPA: local requests for swarm storage and retrieval -*/ -type ChunkStore interface { - Put(*Chunk) // effectively there is no error even if there is an error - Get(Key) (*Chunk, error) - Close() -} - -/* -Chunker is the interface to a component that is responsible for disassembling and assembling larger data and indended to be the dependency of a DPA storage system with fixed maximum chunksize. - -It relies on the underlying chunking model. - -When calling Split, the caller provides a channel (chan *Chunk) on which it receives chunks to store. The DPA delegates to storage layers (implementing ChunkStore interface). - -Split returns an error channel, which the caller can monitor. -After getting notified that all the data has been split (the error channel is closed), the caller can safely read or save the root key. Optionally it times out if not all chunks get stored or not the entire stream of data has been processed. By inspecting the errc channel the caller can check if any explicit errors (typically IO read/write failures) occurred during splitting. - -When calling Join with a root key, the caller gets returned a seekable lazy reader. The caller again provides a channel on which the caller receives placeholder chunks with missing data. The DPA is supposed to forward this to the chunk stores and notify the chunker if the data has been delivered (i.e. retrieved from memory cache, disk-persisted db or cloud based swarm delivery). As the seekable reader is used, the chunker then puts these together the relevant parts on demand. -*/ -type Splitter interface { - /* - When splitting, data is given as a SectionReader, and the key is a hashSize long byte slice (Key), the root hash of the entire content will fill this once processing finishes. - New chunks to store are coming to caller via the chunk storage channel, which the caller provides. - wg is a Waitgroup (can be nil) that can be used to block until the local storage finishes - The caller gets returned an error channel, if an error is encountered during splitting, it is fed to errC error channel. - A closed error signals process completion at which point the key can be considered final if there were no errors. - */ - Split(io.Reader, int64, chan *Chunk) (Key, func(), error) - - /* This is the first step in making files mutable (not chunks).. - Append allows adding more data chunks to the end of the already existsing file. - The key for the root chunk is supplied to load the respective tree. - Rest of the parameters behave like Split. - */ - Append(Key, io.Reader, chan *Chunk) (Key, func(), error) -} - -type Joiner interface { - /* - Join reconstructs original content based on a root key. - When joining, the caller gets returned a Lazy SectionReader, which is - seekable and implements on-demand fetching of chunks as and where it is read. - New chunks to retrieve are coming to caller via the Chunk channel, which the caller provides. - If an error is encountered during joining, it appears as a reader error. - The SectionReader. - As a result, partial reads from a document are possible even if other parts - are corrupt or lost. - The chunks are not meant to be validated by the chunker when joining. This - is because it is left to the DPA to decide which sources are trusted. - */ - Join(key Key, chunkC chan *Chunk, depth int) LazySectionReader -} - -type Chunker interface { - Joiner - Splitter - // returns the key length - // KeySize() int64 -} - // Size, Seek, Read, ReadAt type LazySectionReader interface { Size(chan bool) (int64, error) @@ -329,3 +256,32 @@ type LazyTestSectionReader struct { func (self *LazyTestSectionReader) Size(chan bool) (int64, error) { return self.SectionReader.Size(), nil } + +type ChunkData []byte + +type Reference []byte + +// Putter is responsible to store data and create a reference for it +type Putter interface { + Put(ChunkData) (Reference, error) + // RefSize returns the length of the Reference created by this Putter + RefSize() int64 + // Close is to indicate that no more chunk data will be Put on this Putter + Close() + // Wait returns if all data has been store and the Close() was called. + Wait() +} + +// Getter is an interface to retrieve a chunk's data by its reference +type Getter interface { + Get(Reference) (ChunkData, error) +} + +// NOTE: this returns invalid data if chunk is encrypted +func (c ChunkData) Size() int64 { + return int64(binary.LittleEndian.Uint64(c[:8])) +} + +func (c ChunkData) Data() []byte { + return c[8:] +} diff --git a/swarm/swarm.go b/swarm/swarm.go index e76ca8848f3d..146412ad33c3 100644 --- a/swarm/swarm.go +++ b/swarm/swarm.go @@ -117,7 +117,7 @@ func NewSwarm(ctx *node.ServiceContext, backend chequebook.Backend, config *api. } log.Debug(fmt.Sprintf("Setting up Swarm service components")) - hash := storage.MakeHashFunc(config.ChunkerParams.Hash) + hash := storage.MakeHashFunc(config.DPAParams.Hash) self.lstore, err = storage.NewLocalStore(hash, config.StoreParams, common.Hex2Bytes(config.BzzKey), mockStore) if err != nil { return @@ -165,7 +165,7 @@ func NewSwarm(ctx *node.ServiceContext, backend chequebook.Backend, config *api. dpaChunkStore := storage.NewNetStore(self.lstore, self.streamer.Retrieve) log.Debug(fmt.Sprintf("-> Local Access to Swarm")) // Swarm Hash Merklised Chunking for Arbitrary-length Document/File storage - self.dpa = storage.NewDPA(dpaChunkStore, self.config.ChunkerParams) + self.dpa = storage.NewDPA(dpaChunkStore, self.config.DPAParams) log.Debug(fmt.Sprintf("-> Content Store API")) // Pss = postal service over swarm (devp2p over bzz) @@ -361,9 +361,6 @@ func (self *Swarm) Start(srv *p2p.Server) error { log.Info("Pss started") } - self.dpa.Start() - log.Debug(fmt.Sprintf("Swarm DPA started")) - // start swarm http proxy server if self.config.Port != "" { addr := net.JoinHostPort(self.config.ListenAddr, self.config.Port) @@ -404,7 +401,6 @@ func (self *Swarm) updateGauges() { // implements the node.Service interface // stops all component services. func (self *Swarm) Stop() error { - self.dpa.Stop() if self.ps != nil { self.ps.Stop() } diff --git a/swarm/testutil/http.go b/swarm/testutil/http.go index 2de0c0f96611..f253d13a66cc 100644 --- a/swarm/testutil/http.go +++ b/swarm/testutil/http.go @@ -57,12 +57,7 @@ func NewTestSwarmServer(t *testing.T) *TestSwarmServer { os.RemoveAll(dir) t.Fatal(err) } - chunker := storage.NewTreeChunker(storage.NewChunkerParams()) - dpa := &storage.DPA{ - Chunker: chunker, - ChunkStore: localStore, - } - dpa.Start() + dpa := storage.NewDPA(localStore, storage.NewDPAParams()) // mutable resources test setup resourceDir, err := ioutil.TempDir("", "swarm-resource-test") @@ -85,7 +80,6 @@ func NewTestSwarmServer(t *testing.T) *TestSwarmServer { cleanup: func() { srv.Close() rh.Close() - dpa.Stop() os.RemoveAll(dir) os.RemoveAll(resourceDir) },