Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions pkg/fileservice/io_merger.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ func NewIOMerger() *IOMerger {

var slowIOWaitDuration = time.Second * 10

func (i *IOMerger) waitFunc(key IOMergeKey, ch chan struct{}) func() {
var maxIOWaitDuration = time.Minute

func (i *IOMerger) makeWaitFunc(key IOMergeKey, ch chan struct{}, maxWaitDuration time.Duration) func() {
metric.IOMergerCounterWait.Add(1)
return func() {
t0 := time.Now()
Expand All @@ -55,6 +57,11 @@ func (i *IOMerger) waitFunc(key IOMergeKey, ch chan struct{}) func() {
timer.Stop()
return
case <-timer.C:
if time.Since(t0) > maxWaitDuration {
// don't wait too long
// number of I/O requests may increase, but we don't want to hurt latencies too much.
return
}
logutil.Warn("wait io for too long",
zap.Any("wait", time.Since(t0)),
zap.Any("key", key),
Expand All @@ -64,18 +71,18 @@ func (i *IOMerger) waitFunc(key IOMergeKey, ch chan struct{}) func() {
}
}

func (i *IOMerger) Merge(key IOMergeKey) (done func(), wait func()) {
func (i *IOMerger) Merge(key IOMergeKey, maxWaitDuration time.Duration) (done func(), wait func()) {
if v, ok := i.flying.Load(key); ok {
// wait
return nil, i.waitFunc(key, v.(chan struct{}))
return nil, i.makeWaitFunc(key, v.(chan struct{}), maxWaitDuration)
}

// try initiate
ch := make(chan struct{})
v, loaded := i.flying.LoadOrStore(key, ch)
if loaded {
// not the first request, wait
return nil, i.waitFunc(key, v.(chan struct{}))
return nil, i.makeWaitFunc(key, v.(chan struct{}), maxWaitDuration)
}

// initiated
Expand Down
20 changes: 17 additions & 3 deletions pkg/fileservice/io_merger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package fileservice
import (
"sync"
"testing"
"time"
)

func TestIOMerger(t *testing.T) {
Expand All @@ -34,7 +35,7 @@ func TestIOMerger(t *testing.T) {
go func() {
defer wg.Done()
for {
done, wait := merger.Merge(key)
done, wait := merger.Merge(key, time.Second)
if done != nil {
cs = append(cs, c)
c++
Expand Down Expand Up @@ -67,7 +68,7 @@ func BenchmarkIOMergerNoContention(b *testing.B) {
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
done, wait := merger.Merge(key)
done, wait := merger.Merge(key, time.Second)
if done != nil {
done()
} else {
Expand All @@ -84,7 +85,7 @@ func BenchmarkIOMergerParallel(b *testing.B) {
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
done, wait := merger.Merge(key)
done, wait := merger.Merge(key, time.Second)
if done != nil {
done()
} else {
Expand All @@ -93,3 +94,16 @@ func BenchmarkIOMergerParallel(b *testing.B) {
}
})
}

func TestIOMergerMaxWait(t *testing.T) {
merger := NewIOMerger()
key := IOMergeKey{
Path: "foo",
}
// initiate
_, _ = merger.Merge(key, time.Second)
// wait
_, wait := merger.Merge(key, time.Second)
// will return
wait()
}
4 changes: 2 additions & 2 deletions pkg/fileservice/local_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,10 +425,10 @@ read_disk_cache:
if mayReadMemoryCache || mayReadDiskCache {
// may read caches, merge
startLock := time.Now()
done, wait := l.ioMerger.Merge(vector.ioMergeKey())
done, wait := l.ioMerger.Merge(vector.ioMergeKey(), maxIOWaitDuration)
if done != nil {
stats.AddLocalFSReadIOMergerTimeConsumption(time.Since(startLock))
defer done()
stats.AddLocalFSReadIOMergerTimeConsumption(time.Since(startLock))
} else {
wait()
stats.AddLocalFSReadIOMergerTimeConsumption(time.Since(startLock))
Expand Down
6 changes: 3 additions & 3 deletions pkg/fileservice/s3_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ func (s *S3FS) PrefetchFile(ctx context.Context, filePath string) error {

done, _ := s.ioMerger.Merge(IOMergeKey{
Path: filePath,
})
}, maxIOWaitDuration)
if done != nil {
defer done()
} else {
Expand Down Expand Up @@ -596,12 +596,12 @@ read_disk_cache:
// may read caches, merge
LogEvent(ctx, str_ioMerger_Merge_begin)
startLock := time.Now()
done, wait := s.ioMerger.Merge(vector.ioMergeKey())
done, wait := s.ioMerger.Merge(vector.ioMergeKey(), maxIOWaitDuration)
if done != nil {
defer done()
stats.AddS3FSReadIOMergerTimeConsumption(time.Since(startLock))
LogEvent(ctx, str_ioMerger_Merge_initiate)
LogEvent(ctx, str_ioMerger_Merge_end)
defer done()
} else {
LogEvent(ctx, str_ioMerger_Merge_wait)
wait()
Expand Down
Loading