Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,9 @@ endif
.PHONY: test-ci-race
test-ci-race:
ifdef cover
$(GO) test -race -run "[^FLAKY]$$" -coverprofile=cover.out ./...
@bash -c '$(GO) test -count=1 -race -run "[^FLAKY]$$" -coverprofile=cover.out ./... 2>&1 | grep -v "malformed LC_DYSYMTAB" || true; exit $${PIPESTATUS[0]}'
else
$(GO) test -race -run "[^FLAKY]$$" ./...
@bash -c '$(GO) test -count=1 -race -run "[^FLAKY]$$" ./... 2>&1 | grep -v "malformed LC_DYSYMTAB" || true; exit $${PIPESTATUS[0]}'
endif

.PHONY: test-ci-flaky
Expand Down
55 changes: 40 additions & 15 deletions pkg/file/joiner/joiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,14 @@ func (g *decoderCache) createRemoveCallback(key string) func(error) {
}
}

// getFromCache safely retrieves a decoder from cache with lock
func (g *decoderCache) getFromCache(key string) (storage.Getter, bool) {
g.mu.Lock()
defer g.mu.Unlock()
d, ok := g.cache[key]
return d, ok
}

// GetOrCreate returns a decoder for the given chunk address
func (g *decoderCache) GetOrCreate(addrs []swarm.Address, shardCnt int) storage.Getter {
// since a recovery decoder is not allowed, simply return the underlying netstore
Expand All @@ -93,9 +101,8 @@ func (g *decoderCache) GetOrCreate(addrs []swarm.Address, shardCnt int) storage.
}

key := fingerprint(addrs)
g.mu.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just my 2-cents: since one mutex lock-unlock cycle turns into multiple, the chances of having race conditions increases. Since multiple callers may have now access to the same critical section at the same time, this might cause other problems elsewhere down the line (once some of the interleaving logic changes).

defer g.mu.Unlock()
d, ok := g.cache[key]
d, ok := g.getFromCache(key)

if ok {
if d == nil {
// The nil value indicates a previous successful recovery
Expand All @@ -105,15 +112,20 @@ func (g *decoderCache) GetOrCreate(addrs []swarm.Address, shardCnt int) storage.
// Create a factory function that will instantiate the decoder only when needed
recovery := func() storage.Getter {
g.config.Logger.Debug("lazy-creating recovery decoder after fetch failed", "key", key)

if d, ok := g.getFromCache(key); ok && d != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since getter.New should return almost immediately, wouldn't it be better to just lock once and do both the get and the set, instead of locking twice, checking twice and potentially calling (and allocating) in vain to getter.New?
Bear in mind also that getter.New has side-effects as it starts a whole set of goroutines with go d.prefetch(). It would be good to avoid it if it's not necessary.

return d
}

newGetter := getter.New(addrs, shardCnt, g.fetcher, g.putter, decoderCallback, g.config)

g.mu.Lock()
defer g.mu.Unlock()
d, ok := g.cache[key]
if ok && d != nil {
if d, ok := g.cache[key]; ok && d != nil {
return d
}
d = getter.New(addrs, shardCnt, g.fetcher, g.putter, decoderCallback, g.config)
g.cache[key] = d
return d
g.cache[key] = newGetter
return newGetter
}

return getter.NewReDecoder(g.fetcher, recovery, g.config.Logger)
Expand All @@ -122,9 +134,16 @@ func (g *decoderCache) GetOrCreate(addrs []swarm.Address, shardCnt int) storage.
}

removeCallback := g.createRemoveCallback(key)
d = getter.New(addrs, shardCnt, g.fetcher, g.putter, removeCallback, g.config)
g.cache[key] = d
return d
newGetter := getter.New(addrs, shardCnt, g.fetcher, g.putter, removeCallback, g.config)

// ensure no other goroutine created the same getter
g.mu.Lock()
defer g.mu.Unlock()
if d, ok := g.cache[key]; ok {
return d
}
g.cache[key] = newGetter
return newGetter
}

// New creates a new Joiner. A Joiner provides Read, Seek and Size functionalities.
Expand Down Expand Up @@ -211,8 +230,8 @@ func (j *joiner) ReadAt(buffer []byte, off int64) (read int, err error) {

readLen := min(int64(cap(buffer)), j.span-off)
var bytesRead int64
var eg errgroup.Group
j.readAtOffset(buffer, j.rootData, 0, j.span, off, 0, readLen, &bytesRead, j.rootParity, &eg)
eg, ectx := errgroup.WithContext(j.ctx)
j.readAtOffset(buffer, j.rootData, 0, j.span, off, 0, readLen, &bytesRead, j.rootParity, eg, ectx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ectx==ctx and so therefore it is not needed to add it to the signature, since readAtOffset can just select on the struct field


err = eg.Wait()
if err != nil {
Expand All @@ -230,6 +249,7 @@ func (j *joiner) readAtOffset(
bytesRead *int64,
parity int,
eg *errgroup.Group,
ectx context.Context,
) {
// we are at a leaf data chunk
if subTrieSize <= int64(len(data)) {
Expand Down Expand Up @@ -280,7 +300,12 @@ func (j *joiner) readAtOffset(

func(address swarm.Address, b []byte, cur, subTrieSize, off, bufferOffset, bytesToRead, subtrieSpanLimit int64) {
eg.Go(func() error {
ch, err := g.Get(j.ctx, addr)
select {
case <-ectx.Done():
return ectx.Err()
default:
}
ch, err := g.Get(ectx, addr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the Get method already gets passed the context, shouldn't/wouldn't it better that it just checks whether the context expired before+after doing anything? this way you get better coverage of the whole codebase instead of having to dot every usage of the method elsewhere.

if err != nil {
return err
}
Expand All @@ -293,7 +318,7 @@ func (j *joiner) readAtOffset(
return ErrMalformedTrie
}

j.readAtOffset(b, chunkData, cur, subtrieSpan, off, bufferOffset, currentReadSize, bytesRead, subtrieParity, eg)
j.readAtOffset(b, chunkData, cur, subtrieSpan, off, bufferOffset, currentReadSize, bytesRead, subtrieParity, eg, ectx)
return nil
})
}(addr, b, cur, subtrieSpan, off, bufferOffset, currentReadSize, subtrieSpanLimit)
Expand Down
Loading