Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br/lightning: add kv writer for external backend #46042

Merged
merged 10 commits into from
Aug 15, 2023
Prev Previous commit
Next Next commit
address comment
  • Loading branch information
tangenta committed Aug 14, 2023
commit 6261a557e914437b33bb43295cc65a24bcde7de8
1 change: 0 additions & 1 deletion br/pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ go_library(
"//util/mathutil",
"//util/size",
"@com_github_pingcap_errors//:errors",
"@org_golang_x_exp//slices",
"@org_uber_go_zap//:zap",
],
)
Expand Down
29 changes: 18 additions & 11 deletions br/pkg/lightning/backend/external/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"strconv"
"time"

"slices"

"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/encode"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
Expand All @@ -32,7 +34,6 @@ import (
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/size"
"go.uber.org/zap"
"golang.org/x/exp/slices"
)

// rangePropertiesCollector collects range properties for each range. The zero
Expand Down Expand Up @@ -217,7 +218,7 @@ func (w *Writer) Close(ctx context.Context) (backend.ChunkFlushStatus, error) {
return status(false), err
}

logutil.BgLogger().Info("close writer",
logutil.Logger(ctx).Info("close writer",
zap.Int("writerID", w.writerID),
zap.String("minKey", hex.EncodeToString(w.minKey)),
zap.String("maxKey", hex.EncodeToString(w.maxKey)))
Expand Down Expand Up @@ -250,7 +251,7 @@ func (s status) Flushed() bool {
return bool(s)
}

func (w *Writer) flushKVs(ctx context.Context) error {
func (w *Writer) flushKVs(ctx context.Context) (err error) {
if len(w.writeBatch) == 0 {
return nil
}
Expand All @@ -265,22 +266,28 @@ func (w *Writer) flushKVs(ctx context.Context) error {

defer func() {
w.currentSeq++
err := dataWriter.Close(w.ctx)
err1, err2 := dataWriter.Close(w.ctx), statWriter.Close(w.ctx)
if err != nil {
logutil.BgLogger().Error("close data writer failed", zap.Error(err))
return
}
err = statWriter.Close(w.ctx)
if err != nil {
logutil.BgLogger().Error("close stat writer failed", zap.Error(err))
if err1 != nil {
logutil.Logger(ctx).Error("close data writer failed", zap.Error(err))
err = err1
return
}
if err2 != nil {
logutil.Logger(ctx).Error("close stat writer failed", zap.Error(err))
err = err2
return
}
logutil.BgLogger().Info("flush kv",
logutil.Logger(ctx).Info("flush kv",
zap.Duration("time", time.Since(ts)),
zap.Uint64("bytes", saveBytes),
zap.Any("rate", float64(saveBytes)/1024.0/1024.0/time.Since(ts).Seconds()))
}()

slices.SortFunc(w.writeBatch[:], func(i, j common.KvPair) bool {
return bytes.Compare(i.Key, j.Key) < 0
slices.SortFunc(w.writeBatch[:], func(i, j common.KvPair) int {
return bytes.Compare(i.Key, j.Key)
})

w.kvStore, err = NewKeyValueStore(ctx, dataWriter, w.rc, w.writerID, w.currentSeq)
Expand Down