Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 67 additions & 30 deletions go/store/nbs/journal_chunk_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ package nbs

import (
"context"
"errors"
"fmt"
"io"
"sort"
"sync"

"golang.org/x/sync/errgroup"

Expand Down Expand Up @@ -69,44 +72,78 @@ func (s journalChunkSource) get(_ context.Context, h hash.Hash, _ *Stats) ([]byt
return ch.Data(), nil
}

func (s journalChunkSource) getMany(ctx context.Context, _ *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), stats *Stats) (bool, error) {
var remaining bool
// todo: read planning
for i := range reqs {
if reqs[i].found {
continue
}
data, err := s.get(ctx, *reqs[i].a, stats)
if err != nil {
return false, err
} else if data != nil {
reqs[i].found = true
ch := chunks.NewChunkWithHash(hash.Hash(*reqs[i].a), data)
found(ctx, &ch)
} else {
remaining = true
}
}
return remaining, nil
type journalRecord struct {
// r is the journal range for this chunk
r Range
// idx is the array offset into the shared |reqs|
idx int
}

func (s journalChunkSource) getManyCompressed(ctx context.Context, _ *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), stats *Stats) (bool, error) {
func (s journalChunkSource) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), stats *Stats) (bool, error) {
return s.getManyCompressed(ctx, eg, reqs, func(ctx context.Context, cc CompressedChunk) {
ch, err := cc.ToChunk()
if err != nil {
eg.Go(func() error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, I don't think this is race-free. In particular, I don't think it's safe to eg.Go() on an errgroup where eg.Wait() may have already been called.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think I might be wrong here...in particular, errgroup is implemented in terms of a sync.WaitGroup, and I thought all positive delta wg.Add() calls were dangerous after a wg.Wait(), but that's not true...

// Note that calls with a positive delta that occur when the counter is zero
// must happen before a Wait. Calls with a negative delta, or calls with a
// positive delta that start when the counter is greater than zero, may happen
// at any time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And by, "I think I might be wrong here"...I'm quite sure I'm wrong and your usage is safe / to spec. Sorry for the noise.

return err
})
return
}
chWHash := chunks.NewChunkWithHash(cc.Hash(), ch.Data())
found(ctx, &chWHash)
}, stats)
}

// getManyCompressed implements chunkReader. Here we (1) synchronously check
// the journal index for read ranges, (2) record if the source misses any
// needed remaining chunks, (3) sort the lookups for efficient disk access,
// and then (4) asynchronously perform reads. We release the journal read
// lock after returning when all reads are completed, which can be after the
// function returns.
func (s journalChunkSource) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), stats *Stats) (bool, error) {
var remaining bool
// todo: read planning
for i := range reqs {
if reqs[i].found {
var jReqs []journalRecord
var wg sync.WaitGroup
s.journal.lock.RLock()
for i, r := range reqs {
if r.found {
continue
}
cc, err := s.getCompressed(ctx, *reqs[i].a, stats)
if err != nil {
return false, err
} else if cc.IsEmpty() {
rang, ok := s.journal.ranges.get(*r.a)
if !ok {
remaining = true
} else {
reqs[i].found = true
found(ctx, cc)
continue
}
jReqs = append(jReqs, journalRecord{r: rang, idx: i})
reqs[i].found = true
}

// sort chunks by journal locality
sort.Slice(jReqs, func(i, j int) bool {
return jReqs[i].r.Offset < jReqs[j].r.Offset
})

for i := range jReqs {
// workers populate the parent error group
// record local workers for releasing lock
wg.Add(1)
eg.Go(func() error {
defer wg.Done()
rec := jReqs[i]
a := reqs[rec.idx].a
if cc, err := s.journal.getCompressedChunkAtRange(rec.r, *a); err != nil {
return err
} else if cc.IsEmpty() {
return errors.New("chunk in journal index was empty.")
} else {
found(ctx, cc)
return nil
}
})
}
go func() {
wg.Wait()
s.journal.lock.RUnlock()
}()
return remaining, nil
}

Expand Down
11 changes: 10 additions & 1 deletion go/store/nbs/journal_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,16 @@ func (wr *journalWriter) getCompressedChunk(h hash.Hash) (CompressedChunk, error
}
buf := make([]byte, r.Length)
if _, err := wr.readAt(buf, int64(r.Offset)); err != nil {
return CompressedChunk{}, nil
return CompressedChunk{}, err
}
return NewCompressedChunk(hash.Hash(h), buf)
}

// getCompressedChunk reads the CompressedChunks with addr |h|.
func (wr *journalWriter) getCompressedChunkAtRange(r Range, h hash.Hash) (CompressedChunk, error) {
buf := make([]byte, r.Length)
if _, err := wr.readAt(buf, int64(r.Offset)); err != nil {
return CompressedChunk{}, err
}
return NewCompressedChunk(hash.Hash(h), buf)
}
Expand Down