Skip to content

Commit

Permalink
Merge pull request ethereum#350 from samuil/swarm-network-rewrite
Browse files Browse the repository at this point in the history
storage: wrap closing chunk stored chan in mutex
  • Loading branch information
nonsense authored Mar 29, 2018
2 parents 539806d + a344c2f commit 6d182b8
Show file tree
Hide file tree
Showing 10 changed files with 45 additions and 25 deletions.
2 changes: 1 addition & 1 deletion swarm/storage/chunker.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ func (self *TreeChunker) hashChunk(hasher SwarmHash, job *hashJob, chunkC chan *
storeWg.Add(1)
go func() {
defer storeWg.Done()
<-newChunk.dbStored
<-newChunk.dbStoredC
}()
}
}
Expand Down
6 changes: 3 additions & 3 deletions swarm/storage/chunker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (self *chunkerTester) Split(chunker Splitter, data io.Reader, size int64, c
case chunk := <-chunkC:
// self.chunks = append(self.chunks, chunk)
self.chunks[chunk.Key.Hex()] = chunk
close(chunk.dbStored)
chunk.markAsStored()
}

}
Expand Down Expand Up @@ -105,12 +105,12 @@ func (self *chunkerTester) Append(chunker Splitter, rootKey Key, data io.Reader,
if !success {
// Requesting data
self.chunks[chunk.Key.Hex()] = chunk
close(chunk.dbStored)
chunk.markAsStored()
} else {
// getting data
chunk.SData = stored.SData
chunk.Size = int64(binary.LittleEndian.Uint64(chunk.SData[0:8]))
close(chunk.dbStored)
chunk.markAsStored()
if chunk.C != nil {
close(chunk.C)
}
Expand Down
4 changes: 2 additions & 2 deletions swarm/storage/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func mput(store ChunkStore, processors int, n int, f func(i int) *Chunk) (hs []K

store.Put(chunk)

<-chunk.dbStored
<-chunk.dbStoredC
}()
}
}()
Expand All @@ -110,7 +110,7 @@ func mput(store ChunkStore, processors int, n int, f func(i int) *Chunk) (hs []K
if _, ok := store.(*MemStore); ok {
fa = func(i int) *Chunk {
chunk := f(i)
close(chunk.dbStored)
chunk.markAsStored()
return chunk
}
}
Expand Down
9 changes: 5 additions & 4 deletions swarm/storage/ldbstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ func NewMockDbStore(path string, hash SwarmHasher, capacity uint64, po func(Key)
if err != nil {
return nil, err
}

// replace put and get with mock store functionality
if mockStore != nil {
s.encodeDataFunc = newMockEncodeDataFunc(mockStore)
Expand Down Expand Up @@ -425,7 +426,7 @@ func (s *LDBStore) Import(in io.Reader) (int64, error) {
wg.Add(1)
go func() {
defer wg.Done()
<-chunk.dbStored
<-chunk.dbStoredC
}()
count++
}
Expand Down Expand Up @@ -565,12 +566,12 @@ func (s *LDBStore) Put(chunk *Chunk) {
batchC := s.batchC
go func() {
<-batchC
close(chunk.dbStored)
chunk.markAsStored()
}()
} else {
log.Trace("ldbstore.put: chunk already exists, only update access", "key", chunk.Key)
decodeIndex(idata, &index)
close(chunk.dbStored)
chunk.markAsStored()
}
index.Access = s.accessCnt
s.accessCnt++
Expand Down Expand Up @@ -711,7 +712,7 @@ func (s *LDBStore) get(key Key) (chunk *Chunk, err error) {
}

chunk = NewChunk(key, nil)
close(chunk.dbStored)
chunk.markAsStored()
decodeData(data, chunk)
} else {
err = ErrChunkNotFound
Expand Down
2 changes: 1 addition & 1 deletion swarm/storage/ldbstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func testIterator(t *testing.T, mock bool) {
j := i
go func() {
defer wg.Done()
<-chunks[j].dbStored
<-chunks[j].dbStoredC
}()
}

Expand Down
10 changes: 6 additions & 4 deletions swarm/storage/localstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,12 @@ func (self *LocalStore) Put(chunk *Chunk) {

chunk.Size = int64(binary.LittleEndian.Uint64(chunk.SData[0:8]))
c := &Chunk{
Key: Key(append([]byte{}, chunk.Key...)),
SData: append([]byte{}, chunk.SData...),
Size: chunk.Size,
dbStored: chunk.dbStored,
Key: Key(append([]byte{}, chunk.Key...)),
SData: append([]byte{}, chunk.SData...),
Size: chunk.Size,
dbStored: chunk.dbStored,
dbStoredC: chunk.dbStoredC,
dbStoredMu: chunk.dbStoredMu,
}

dbStorePutCounter.Inc(1)
Expand Down
2 changes: 1 addition & 1 deletion swarm/storage/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func (s *MemStore) removeOldest() {

if node.entry.ReqC == nil {
log.Trace(fmt.Sprintf("Memstore Clean: Waiting for chunk %v to be saved", node.entry.Key.Log()))
<-node.entry.dbStored
<-node.entry.dbStoredC
log.Trace(fmt.Sprintf("Memstore Clean: Chunk %v saved to DBStore. Ready to clear from mem.", node.entry.Key.Log()))

memstoreRemoveCounter.Inc(1)
Expand Down
2 changes: 1 addition & 1 deletion swarm/storage/pyramid.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func (self *PyramidChunker) processChunk(id int64, hasher SwarmHash, job *chunkJ
storageWG.Add(1)
go func() {
defer storageWG.Done()
<-newChunk.dbStored
<-newChunk.dbStoredC
}()
}
}
Expand Down
2 changes: 1 addition & 1 deletion swarm/storage/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ func (self *ResourceHandler) update(ctx context.Context, name string, data []byt
self.Put(chunk)
timeout := time.NewTimer(self.storeTimeout)
select {
case <-chunk.dbStored:
case <-chunk.dbStoredC:
case <-timeout.C:
return nil, NewResourceError(ErrIO, "chunk store timeout")
}
Expand Down
31 changes: 24 additions & 7 deletions swarm/storage/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,13 @@ type Chunk struct {
SData []byte // nil if request, to be supplied by dpa
Size int64 // size of the data covered by the subtree encoded in this chunk
//Source Peer // peer
C chan bool // to signal data delivery by the dpa
ReqC chan bool // to signal the request done
dbStored chan bool // never remove a chunk from memStore before it is written to dbStore
errored bool // flag which is set when the chunk request has errored or timeouted
erroredMu sync.Mutex
C chan bool // to signal data delivery by the dpa
ReqC chan bool // to signal the request done
dbStoredC chan bool // never remove a chunk from memStore before it is written to dbStore
dbStored bool
dbStoredMu *sync.Mutex
errored bool // flag which is set when the chunk request has errored or timeouted
erroredMu sync.Mutex
}

func (c *Chunk) SetErrored(val bool) {
Expand All @@ -196,11 +198,26 @@ func (c *Chunk) GetErrored() bool {
}

func NewChunk(key Key, reqC chan bool) *Chunk {
return &Chunk{Key: key, ReqC: reqC, dbStored: make(chan bool)}
return &Chunk{
Key: key,
ReqC: reqC,
dbStoredC: make(chan bool),
dbStoredMu: &sync.Mutex{},
}
}

func (c *Chunk) markAsStored() {
c.dbStoredMu.Lock()
defer c.dbStoredMu.Unlock()

if !c.dbStored {
close(c.dbStoredC)
c.dbStored = true
}
}

func (c *Chunk) WaitToStore() {
<-c.dbStored
<-c.dbStoredC
}

func FakeChunk(size int64, count int, chunks []*Chunk) int {
Expand Down

0 comments on commit 6d182b8

Please sign in to comment.