Skip to content

Commit

Permalink
enhance: Apply deltalog when scanning binlog (#311)
Browse files Browse the repository at this point in the history
This PR applies deletion from deltalog(segment itself & l0 segments)
when scanning binlogs.

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
  • Loading branch information
congqixia authored Sep 13, 2024
1 parent 51fcbdb commit 3a52682
Showing 1 changed file with 71 additions and 9 deletions.
80 changes: 71 additions & 9 deletions states/scan_binlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type ScanBinlogParams struct {
MinioAddress string `name:"minioAddr"`
SkipBucketCheck bool `name:"skipBucketCheck" default:"false" desc:"skip bucket exist check due to permission issue"`
Action string `name:"action" default:"count"`
IgnoreDelete bool `name:"ignoreDelete" default:"false" desc:"ignore delete logic"`
IncludeUnhealthy bool `name:"includeUnhealthy" default:"false" desc:"also check dropped segments"`
}

func (s *InstanceState) ScanBinlogCommand(ctx context.Context, p *ScanBinlogParams) error {
Expand All @@ -50,14 +52,21 @@ func (s *InstanceState) ScanBinlogCommand(ctx context.Context, p *ScanBinlogPara
fields := make(map[int64]models.FieldSchema) // make([]models.FieldSchema, 0, len(p.Fields))

for _, fieldSchema := range collection.Schema.Fields {
// timestamp field id
if fieldSchema.FieldID == 1 {
fields[fieldSchema.FieldID] = fieldSchema
continue
}
if _, ok := fieldsMap[fieldSchema.Name]; ok {
fmt.Printf("Output Field %s field id %d\n", fieldSchema.Name, fieldSchema.FieldID)
fields[fieldSchema.FieldID] = fieldSchema
}
}

segments, err := common.ListSegmentsVersion(ctx, s.client, s.basePath, etcdversion.GetVersion(), func(s *models.Segment) bool {
return (p.SegmentID == 0 || p.SegmentID == s.ID) && p.CollectionID == s.CollectionID
return (p.SegmentID == 0 || p.SegmentID == s.ID) &&
p.CollectionID == s.CollectionID &&
(p.IncludeUnhealthy || s.State != models.SegmentStateDropped)
})
if err != nil {
return err
Expand All @@ -84,7 +93,54 @@ func (s *InstanceState) ScanBinlogCommand(ctx context.Context, p *ScanBinlogPara
ids := make(map[any]struct{})
dedupResult := make(map[any]int64)

for _, segment := range segments {
getObject := func(binlogPath string) (*minio.Object, error) {
logPath := strings.Replace(binlogPath, "ROOT_PATH", rootPath, -1)
return minioClient.GetObject(ctx, bucketName, logPath, minio.GetObjectOptions{})
}

l0Segments := lo.Filter(segments, func(segment *models.Segment, _ int) bool {
return segment.Level == models.SegmentLevelL0
})
normalSegments := lo.Filter(segments, func(segment *models.Segment, _ int) bool {
return segment.Level != models.SegmentLevelL0
})

l0DeleteRecords := make(map[any]uint64) // pk => ts

addDeltaRecords := func(segment *models.Segment, recordMap map[any]uint64) error {
for _, deltaFieldBinlog := range segment.GetDeltalogs() {
for _, deltaBinlog := range deltaFieldBinlog.Binlogs {
deltaObj, err := getObject(deltaBinlog.LogPath)
if err != nil {
return err
}
reader, err := storage.NewDeltalogReader(deltaObj)
if err != nil {
return err
}
deltaData, err := reader.NextEventReader(schemapb.DataType(pkField.DataType))
if err != nil {
return err
}
deltaData.Range(func(pk storage.PrimaryKey, ts uint64) bool {
if old, ok := recordMap[ts]; !ok || ts > old {
recordMap[pk.GetValue()] = ts
}
return true
})
}
}
return nil
}

for _, segment := range l0Segments {
addDeltaRecords(segment, l0DeleteRecords)
}

for _, segment := range normalSegments {
deletedRecords := make(map[any]uint64) // pk => ts
addDeltaRecords(segment, deletedRecords)

var pkBinlog *models.FieldBinlog
targetFieldBinlogs := []*models.FieldBinlog{}
for _, fieldBinlog := range segment.GetBinlogs() {
Expand All @@ -97,10 +153,6 @@ func (s *InstanceState) ScanBinlogCommand(ctx context.Context, p *ScanBinlogPara
}
}

getObject := func(binlogPath string) (*minio.Object, error) {
logPath := strings.Replace(binlogPath, "ROOT_PATH", rootPath, -1)
return minioClient.GetObject(ctx, bucketName, logPath, minio.GetObjectOptions{})
}
if pkBinlog == nil && segment.State != models.SegmentStateGrowing {
fmt.Printf("PK Binlog not found, segment %d\n", segment.ID)
continue
Expand All @@ -122,6 +174,16 @@ func (s *InstanceState) ScanBinlogCommand(ctx context.Context, p *ScanBinlogPara
}

err = s.scanBinlogs(pkObject, fieldObjects, func(pk storage.PrimaryKey, offset int, values map[int64]any) error {
pkv := pk.GetValue()
if !p.IgnoreDelete {
ts := values[1].(int64)
if deletedRecords[pkv] > uint64(ts) {
return nil
}
if l0DeleteRecords[pkv] > uint64(ts) {
return nil
}
}
if len(p.Expr) != 0 {
env := lo.MapKeys(values, func(_ any, fid int64) string {
return fields[fid].Name
Expand Down Expand Up @@ -153,11 +215,11 @@ func (s *InstanceState) ScanBinlogCommand(ctx context.Context, p *ScanBinlogPara
fmt.Printf("entry found, segment %d offset %d, pk: %v\n", segment.ID, offset, pk.GetValue())
fmt.Printf("binlog batch %d, pk binlog %s\n", idx, binlog.LogPath)
case "dedup":
_, ok := ids[pk.GetValue()]
_, ok := ids[pkv]
if ok {
dedupResult[pk.GetValue()]++
dedupResult[pkv]++
}
ids[pk.GetValue()] = struct{}{}
ids[pkv] = struct{}{}
}
return nil
})
Expand Down

0 comments on commit 3a52682

Please sign in to comment.