diff --git a/swarm/storage/chunker.go b/swarm/storage/chunker.go index 85b9e606eeb8..d13953022212 100644 --- a/swarm/storage/chunker.go +++ b/swarm/storage/chunker.go @@ -298,7 +298,7 @@ func (self *TreeChunker) hashChunk(hasher SwarmHash, job *hashJob, chunkC chan * storeWg.Add(1) go func() { defer storeWg.Done() - <-newChunk.dbStored + <-newChunk.dbStoredC }() } } diff --git a/swarm/storage/chunker_test.go b/swarm/storage/chunker_test.go index 664460494005..c1cd48502908 100644 --- a/swarm/storage/chunker_test.go +++ b/swarm/storage/chunker_test.go @@ -65,7 +65,7 @@ func (self *chunkerTester) Split(chunker Splitter, data io.Reader, size int64, c case chunk := <-chunkC: // self.chunks = append(self.chunks, chunk) self.chunks[chunk.Key.Hex()] = chunk - close(chunk.dbStored) + chunk.markAsStored() } } @@ -105,12 +105,12 @@ func (self *chunkerTester) Append(chunker Splitter, rootKey Key, data io.Reader, if !success { // Requesting data self.chunks[chunk.Key.Hex()] = chunk - close(chunk.dbStored) + chunk.markAsStored() } else { // getting data chunk.SData = stored.SData chunk.Size = int64(binary.LittleEndian.Uint64(chunk.SData[0:8])) - close(chunk.dbStored) + chunk.markAsStored() if chunk.C != nil { close(chunk.C) } diff --git a/swarm/storage/common_test.go b/swarm/storage/common_test.go index 52a964114e49..0ad114409562 100644 --- a/swarm/storage/common_test.go +++ b/swarm/storage/common_test.go @@ -101,7 +101,7 @@ func mput(store ChunkStore, processors int, n int, f func(i int) *Chunk) (hs []K store.Put(chunk) - <-chunk.dbStored + <-chunk.dbStoredC }() } }() @@ -110,7 +110,7 @@ func mput(store ChunkStore, processors int, n int, f func(i int) *Chunk) (hs []K if _, ok := store.(*MemStore); ok { fa = func(i int) *Chunk { chunk := f(i) - close(chunk.dbStored) + chunk.markAsStored() return chunk } } diff --git a/swarm/storage/ldbstore.go b/swarm/storage/ldbstore.go index 47a4d312b525..5ff555dd7d54 100644 --- a/swarm/storage/ldbstore.go +++ b/swarm/storage/ldbstore.go @@ -167,6 +167,7 @@ func NewMockDbStore(path string, hash SwarmHasher, capacity uint64, po func(Key) if err != nil { return nil, err } + // replace put and get with mock store functionality if mockStore != nil { s.encodeDataFunc = newMockEncodeDataFunc(mockStore) @@ -425,7 +426,7 @@ func (s *LDBStore) Import(in io.Reader) (int64, error) { wg.Add(1) go func() { defer wg.Done() - <-chunk.dbStored + <-chunk.dbStoredC }() count++ } @@ -565,12 +566,12 @@ func (s *LDBStore) Put(chunk *Chunk) { batchC := s.batchC go func() { <-batchC - close(chunk.dbStored) + chunk.markAsStored() }() } else { log.Trace("ldbstore.put: chunk already exists, only update access", "key", chunk.Key) decodeIndex(idata, &index) - close(chunk.dbStored) + chunk.markAsStored() } index.Access = s.accessCnt s.accessCnt++ @@ -711,7 +712,7 @@ func (s *LDBStore) get(key Key) (chunk *Chunk, err error) { } chunk = NewChunk(key, nil) - close(chunk.dbStored) + chunk.markAsStored() decodeData(data, chunk) } else { err = ErrChunkNotFound diff --git a/swarm/storage/ldbstore_test.go b/swarm/storage/ldbstore_test.go index 6caf556c2d15..d7a6d78f8d83 100644 --- a/swarm/storage/ldbstore_test.go +++ b/swarm/storage/ldbstore_test.go @@ -182,7 +182,7 @@ func testIterator(t *testing.T, mock bool) { j := i go func() { defer wg.Done() - <-chunks[j].dbStored + <-chunks[j].dbStoredC }() } diff --git a/swarm/storage/localstore.go b/swarm/storage/localstore.go index d0fe0627950a..3e19b5046e0a 100644 --- a/swarm/storage/localstore.go +++ b/swarm/storage/localstore.go @@ -91,10 +91,12 @@ func (self *LocalStore) Put(chunk *Chunk) { chunk.Size = int64(binary.LittleEndian.Uint64(chunk.SData[0:8])) c := &Chunk{ - Key: Key(append([]byte{}, chunk.Key...)), - SData: append([]byte{}, chunk.SData...), - Size: chunk.Size, - dbStored: chunk.dbStored, + Key: Key(append([]byte{}, chunk.Key...)), + SData: append([]byte{}, chunk.SData...), + Size: chunk.Size, + dbStored: chunk.dbStored, + dbStoredC: chunk.dbStoredC, + dbStoredMu: chunk.dbStoredMu, } dbStorePutCounter.Inc(1) diff --git a/swarm/storage/memstore.go b/swarm/storage/memstore.go index 5a1ff04c57a2..3e5c025daaf6 100644 --- a/swarm/storage/memstore.go +++ b/swarm/storage/memstore.go @@ -302,7 +302,7 @@ func (s *MemStore) removeOldest() { if node.entry.ReqC == nil { log.Trace(fmt.Sprintf("Memstore Clean: Waiting for chunk %v to be saved", node.entry.Key.Log())) - <-node.entry.dbStored + <-node.entry.dbStoredC log.Trace(fmt.Sprintf("Memstore Clean: Chunk %v saved to DBStore. Ready to clear from mem.", node.entry.Key.Log())) memstoreRemoveCounter.Inc(1) diff --git a/swarm/storage/pyramid.go b/swarm/storage/pyramid.go index 4fa46a69c671..edb784df34b0 100644 --- a/swarm/storage/pyramid.go +++ b/swarm/storage/pyramid.go @@ -285,7 +285,7 @@ func (self *PyramidChunker) processChunk(id int64, hasher SwarmHash, job *chunkJ storageWG.Add(1) go func() { defer storageWG.Done() - <-newChunk.dbStored + <-newChunk.dbStoredC }() } } diff --git a/swarm/storage/resource.go b/swarm/storage/resource.go index 2d3c2de9bd9b..6d0070268883 100644 --- a/swarm/storage/resource.go +++ b/swarm/storage/resource.go @@ -662,7 +662,7 @@ func (self *ResourceHandler) update(ctx context.Context, name string, data []byt self.Put(chunk) timeout := time.NewTimer(self.storeTimeout) select { - case <-chunk.dbStored: + case <-chunk.dbStoredC: case <-timeout.C: return nil, NewResourceError(ErrIO, "chunk store timeout") } diff --git a/swarm/storage/types.go b/swarm/storage/types.go index 7ea04322e168..f0bc49a0500c 100644 --- a/swarm/storage/types.go +++ b/swarm/storage/types.go @@ -174,11 +174,13 @@ type Chunk struct { SData []byte // nil if request, to be supplied by dpa Size int64 // size of the data covered by the subtree encoded in this chunk //Source Peer // peer - C chan bool // to signal data delivery by the dpa - ReqC chan bool // to signal the request done - dbStored chan bool // never remove a chunk from memStore before it is written to dbStore - errored bool // flag which is set when the chunk request has errored or timeouted - erroredMu sync.Mutex + C chan bool // to signal data delivery by the dpa + ReqC chan bool // to signal the request done + dbStoredC chan bool // never remove a chunk from memStore before it is written to dbStore + dbStored bool + dbStoredMu *sync.Mutex + errored bool // flag which is set when the chunk request has errored or timeouted + erroredMu sync.Mutex } func (c *Chunk) SetErrored(val bool) { @@ -196,11 +198,26 @@ func (c *Chunk) GetErrored() bool { } func NewChunk(key Key, reqC chan bool) *Chunk { - return &Chunk{Key: key, ReqC: reqC, dbStored: make(chan bool)} + return &Chunk{ + Key: key, + ReqC: reqC, + dbStoredC: make(chan bool), + dbStoredMu: &sync.Mutex{}, + } +} + +func (c *Chunk) markAsStored() { + c.dbStoredMu.Lock() + defer c.dbStoredMu.Unlock() + + if !c.dbStored { + close(c.dbStoredC) + c.dbStored = true + } } func (c *Chunk) WaitToStore() { - <-c.dbStored + <-c.dbStoredC } func FakeChunk(size int64, count int, chunks []*Chunk) int {