Skip to content

Commit

Permalink
enhance: load delta logs concurrently (#29623)
Browse files Browse the repository at this point in the history
This pr will make milvus load delta logs concurrently, which should
decrease the latency of loading a segment.
/kind improvement

---------

Signed-off-by: longjiquan <jiquan.long@zilliz.com>
  • Loading branch information
longjiquan authored Jan 7, 2024
1 parent d07197a commit 20fb847
Showing 1 changed file with 22 additions and 9 deletions.
31 changes: 22 additions & 9 deletions internal/querynodev2/segments/segment_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/hardware"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
Expand Down Expand Up @@ -797,23 +798,35 @@ func (loader *segmentLoader) LoadDeltaLogs(ctx context.Context, segment Segment,
)
dCodec := storage.DeleteCodec{}
var blobs []*storage.Blob
var futures []*conc.Future[any]
for _, deltaLog := range deltaLogs {
for _, bLog := range deltaLog.GetBinlogs() {
bLog := bLog
// the segment has applied the delta logs, skip it
if bLog.GetTimestampTo() > 0 && // this field may be missed in legacy versions
bLog.GetTimestampTo() < segment.LastDeltaTimestamp() {
continue
}
value, err := loader.cm.Read(ctx, bLog.GetLogPath())
if err != nil {
return err
}
blob := &storage.Blob{
Key: bLog.GetLogPath(),
Value: value,
}
blobs = append(blobs, blob)
future := GetLoadPool().Submit(func() (any, error) {
value, err := loader.cm.Read(ctx, bLog.GetLogPath())
if err != nil {
return nil, err
}
blob := &storage.Blob{
Key: bLog.GetLogPath(),
Value: value,
}
return blob, nil
})
futures = append(futures, future)
}
}
for _, future := range futures {
blob, err := future.Await()
if err != nil {
return err
}
blobs = append(blobs, blob.(*storage.Blob))
}
if len(blobs) == 0 {
log.Info("there are no delta logs saved with segment, skip loading delete record")
Expand Down

0 comments on commit 20fb847

Please sign in to comment.