-
Notifications
You must be signed in to change notification settings - Fork 379
fix(joiner): resolve race condition #5281
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -81,6 +81,14 @@ func (g *decoderCache) createRemoveCallback(key string) func(error) { | |
| } | ||
| } | ||
|
|
||
| // getFromCache safely retrieves a decoder from cache with lock | ||
| func (g *decoderCache) getFromCache(key string) (storage.Getter, bool) { | ||
| g.mu.Lock() | ||
| defer g.mu.Unlock() | ||
| d, ok := g.cache[key] | ||
| return d, ok | ||
| } | ||
|
|
||
| // GetOrCreate returns a decoder for the given chunk address | ||
| func (g *decoderCache) GetOrCreate(addrs []swarm.Address, shardCnt int) storage.Getter { | ||
| // since a recovery decoder is not allowed, simply return the underlying netstore | ||
|
|
@@ -93,9 +101,8 @@ func (g *decoderCache) GetOrCreate(addrs []swarm.Address, shardCnt int) storage. | |
| } | ||
|
|
||
| key := fingerprint(addrs) | ||
| g.mu.Lock() | ||
| defer g.mu.Unlock() | ||
| d, ok := g.cache[key] | ||
| d, ok := g.getFromCache(key) | ||
|
|
||
| if ok { | ||
| if d == nil { | ||
| // The nil value indicates a previous successful recovery | ||
|
|
@@ -105,15 +112,20 @@ func (g *decoderCache) GetOrCreate(addrs []swarm.Address, shardCnt int) storage. | |
| // Create a factory function that will instantiate the decoder only when needed | ||
| recovery := func() storage.Getter { | ||
| g.config.Logger.Debug("lazy-creating recovery decoder after fetch failed", "key", key) | ||
|
|
||
| if d, ok := g.getFromCache(key); ok && d != nil { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. since |
||
| return d | ||
| } | ||
|
|
||
| newGetter := getter.New(addrs, shardCnt, g.fetcher, g.putter, decoderCallback, g.config) | ||
|
|
||
| g.mu.Lock() | ||
| defer g.mu.Unlock() | ||
| d, ok := g.cache[key] | ||
| if ok && d != nil { | ||
| if d, ok := g.cache[key]; ok && d != nil { | ||
| return d | ||
| } | ||
| d = getter.New(addrs, shardCnt, g.fetcher, g.putter, decoderCallback, g.config) | ||
| g.cache[key] = d | ||
| return d | ||
| g.cache[key] = newGetter | ||
| return newGetter | ||
| } | ||
|
|
||
| return getter.NewReDecoder(g.fetcher, recovery, g.config.Logger) | ||
|
|
@@ -122,9 +134,16 @@ func (g *decoderCache) GetOrCreate(addrs []swarm.Address, shardCnt int) storage. | |
| } | ||
|
|
||
| removeCallback := g.createRemoveCallback(key) | ||
| d = getter.New(addrs, shardCnt, g.fetcher, g.putter, removeCallback, g.config) | ||
| g.cache[key] = d | ||
| return d | ||
| newGetter := getter.New(addrs, shardCnt, g.fetcher, g.putter, removeCallback, g.config) | ||
|
|
||
| // ensure no other goroutine created the same getter | ||
| g.mu.Lock() | ||
| defer g.mu.Unlock() | ||
| if d, ok := g.cache[key]; ok { | ||
| return d | ||
| } | ||
| g.cache[key] = newGetter | ||
| return newGetter | ||
| } | ||
|
|
||
| // New creates a new Joiner. A Joiner provides Read, Seek and Size functionalities. | ||
|
|
@@ -211,8 +230,8 @@ func (j *joiner) ReadAt(buffer []byte, off int64) (read int, err error) { | |
|
|
||
| readLen := min(int64(cap(buffer)), j.span-off) | ||
| var bytesRead int64 | ||
| var eg errgroup.Group | ||
| j.readAtOffset(buffer, j.rootData, 0, j.span, off, 0, readLen, &bytesRead, j.rootParity, &eg) | ||
| eg, ectx := errgroup.WithContext(j.ctx) | ||
| j.readAtOffset(buffer, j.rootData, 0, j.span, off, 0, readLen, &bytesRead, j.rootParity, eg, ectx) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
|
||
| err = eg.Wait() | ||
| if err != nil { | ||
|
|
@@ -230,6 +249,7 @@ func (j *joiner) readAtOffset( | |
| bytesRead *int64, | ||
| parity int, | ||
| eg *errgroup.Group, | ||
| ectx context.Context, | ||
| ) { | ||
| // we are at a leaf data chunk | ||
| if subTrieSize <= int64(len(data)) { | ||
|
|
@@ -280,7 +300,12 @@ func (j *joiner) readAtOffset( | |
|
|
||
| func(address swarm.Address, b []byte, cur, subTrieSize, off, bufferOffset, bytesToRead, subtrieSpanLimit int64) { | ||
| eg.Go(func() error { | ||
| ch, err := g.Get(j.ctx, addr) | ||
| select { | ||
| case <-ectx.Done(): | ||
| return ectx.Err() | ||
| default: | ||
| } | ||
| ch, err := g.Get(ectx, addr) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the |
||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
@@ -293,7 +318,7 @@ func (j *joiner) readAtOffset( | |
| return ErrMalformedTrie | ||
| } | ||
|
|
||
| j.readAtOffset(b, chunkData, cur, subtrieSpan, off, bufferOffset, currentReadSize, bytesRead, subtrieParity, eg) | ||
| j.readAtOffset(b, chunkData, cur, subtrieSpan, off, bufferOffset, currentReadSize, bytesRead, subtrieParity, eg, ectx) | ||
| return nil | ||
| }) | ||
| }(addr, b, cur, subtrieSpan, off, bufferOffset, currentReadSize, subtrieSpanLimit) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just my 2-cents: since one mutex lock-unlock cycle turns into multiple, the chances of having race conditions increases. Since multiple callers may have now access to the same critical section at the same time, this might cause other problems elsewhere down the line (once some of the interleaving logic changes).