Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cdc: upgrade pebble and stabilize a case #6116

Merged
merged 9 commits into from
Jun 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/cenkalti/backoff/v4 v4.0.2
github.com/chaos-mesh/go-sqlsmith v0.0.0-20220512075501-53f2916ae240
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e
github.com/cockroachdb/pebble v0.0.0-20211124172904-3ca75111760c
github.com/cockroachdb/pebble v0.0.0-20220415182917-06c9d3be25b3
github.com/coreos/go-semver v0.3.0
github.com/davecgh/go-spew v1.1.1
github.com/deepmap/oapi-codegen v1.9.0
Expand Down
5 changes: 3 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,8 @@ github.com/cockroachdb/errors v1.8.1/go.mod h1:qGwQn6JmZ+oMjuLwjWzUNqblqk0xl4CVV
github.com/cockroachdb/logtags v0.0.0-20190617123548-eb05cc24525f h1:o/kfcElHqOiXqcou5a3rIlMc7oJbMQkeLk0VQJ7zgqY=
github.com/cockroachdb/logtags v0.0.0-20190617123548-eb05cc24525f/go.mod h1:i/u985jwjWRlyHXQbwatDASoW0RMlZ/3i9yJHE2xLkI=
github.com/cockroachdb/pebble v0.0.0-20210719141320-8c3bd06debb5/go.mod h1:JXfQr3d+XO4bL1pxGwKKo09xylQSdZ/mpZ9b2wfVcPs=
github.com/cockroachdb/pebble v0.0.0-20211124172904-3ca75111760c h1:ZtrQD4SC7rV+b1apbqCt9pW23DU2KMRPNk8T+YiezPU=
github.com/cockroachdb/pebble v0.0.0-20211124172904-3ca75111760c/go.mod h1:JXfQr3d+XO4bL1pxGwKKo09xylQSdZ/mpZ9b2wfVcPs=
github.com/cockroachdb/pebble v0.0.0-20220415182917-06c9d3be25b3 h1:snjwkhKc/ZtYIC/hg6UoT5PrhXcZmCsaB+z0bonMDcU=
github.com/cockroachdb/pebble v0.0.0-20220415182917-06c9d3be25b3/go.mod h1:buxOO9GBtOcq1DiXDpIPYrmxY020K2A8lOrwno5FetU=
github.com/cockroachdb/redact v1.0.8 h1:8QG/764wK+vmEYoOlfobpe12EQcS81ukx/a4hdVMxNw=
github.com/cockroachdb/redact v1.0.8/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg=
github.com/cockroachdb/sentry-go v0.6.1-cockroachdb.2 h1:IKgmqgMQlVJIZj19CdocBeSfSaiCbEBZGKODaixqtHM=
Expand Down Expand Up @@ -1603,6 +1603,7 @@ golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210816074244-15123e1e1f71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210908233432-aa78b53d3365/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210909193231-528a39cd75f3/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211013075003-97ac67df715c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand Down
5 changes: 3 additions & 2 deletions pkg/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ package db
// DB is an interface of a leveldb-like database. It's only used to store and sort CDC events
// so that users must ensure keys can be handled by `DecodeKey` in package `cdc/sorter`.
type DB interface {
// Iterator creates an iterator. lowerTs and upperTs can specified a timestamp range, both boundaries
// are included.
// Iterator creates an iterator.
// lowerTs and upperTs can specify a timestamp range,
// both boundaries are included.
//
// For all items in [lowerBound, upperBound), if their decoded CRTS is not in the range [lowerTs, upperTs],
// they may (but not must) be skipped when fetching from the iterator.
Expand Down
2 changes: 1 addition & 1 deletion pkg/db/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (p *pebbleDB) DeleteRange(start, end []byte) error {
}

func (p *pebbleDB) Compact(start, end []byte) error {
return p.db.Compact(start, end)
return p.db.Compact(start, end, true)
}

func (p *pebbleDB) Close() error {
Expand Down
57 changes: 41 additions & 16 deletions pkg/db/pebble_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ package db

import (
"context"
"fmt"
"path/filepath"
"testing"
"time"

"github.com/cockroachdb/pebble"
"github.com/pingcap/tiflow/cdc/sorter/encoding"
"github.com/pingcap/tiflow/pkg/config"
"github.com/stretchr/testify/require"
Expand All @@ -27,35 +28,54 @@ import (
func TestIteratorWithTableFilter(t *testing.T) {
dbPath := filepath.Join(t.TempDir(), t.Name())
db, err := OpenPebble(context.Background(), 1, dbPath, &config.DBConfig{Count: 1},
WithCache(16<<20), WithTableCRTsCollectors())
WithCache(16<<20), WithTableCRTsCollectors(),
// Disable auto compactions to make the case more stable.
func(opts *pebble.Options) { opts.DisableAutomaticCompactions = true },
)
if err != nil {
errmsg := fmt.Sprintf("OpenPebble fail: %v", err)
panic(errmsg)
t.Errorf("OpenPebble failed: %v", err)
}
defer func() { db.Close() }()
defer func() { _ = db.Close() }()

// Put 7 table keys with CRTS=1, and then flush it to L0. The flush is required for generating table properties.
for t := 1; t <= 7; t++ {
key := encoding.EncodeTsKey(1, uint64(t), 1)
for tableID := 1; tableID <= 7; tableID++ {
key := encoding.EncodeTsKey(1, uint64(tableID), 1)
b := db.Batch(1024)
b.Put(key, []byte{'x'})
b.Commit()
if err := b.Commit(); err != nil {
t.Errorf("Put failed: %v", err)
}
}
if err = db.(*pebbleDB).db.Flush(); err != nil {
errmsg := fmt.Sprintf("Flush fail: %v", err)
panic(errmsg)
t.Errorf("Flush failed: %v", err)
}

// Put 9 table keys with CRTS=3, and then flush it to L0.
for t := 1; t <= 9; t++ {
key := encoding.EncodeTsKey(1, uint64(t), 3)
for tableID := 1; tableID <= 9; tableID++ {
key := encoding.EncodeTsKey(1, uint64(tableID), 3)
b := db.Batch(1024)
b.Put(key, []byte{'x'})
b.Commit()
if err := b.Commit(); err != nil {
t.Errorf("Put failed: %v", err)
}
}
if err = db.(*pebbleDB).db.Flush(); err != nil {
errmsg := fmt.Sprintf("Flush fail: %v", err)
panic(errmsg)
t.Errorf("Flush failed: %v", err)
}

// Sleep a while. Automatic compactions shouldn't be triggered.
time.Sleep(time.Second)

// There should be no any compactions but 2 tables at L0.
pebbleInstance := db.(*pebbleDB).db
stats := pebbleInstance.Metrics()
require.Equal(t, int64(0), stats.Compact.Count)
require.Equal(t, int64(2), stats.Levels[0].NumFiles)
// 7 is a pebble internal constant.
// See: https://github.com/cockroachdb/pebble/blob/
// 71d17c2a007bfad5111a229ba325d30251b88a41/internal/manifest/version.go#L579
for level := 1; level < 7; level++ {
require.Equal(t, int64(0), stats.Levels[level].NumFiles)
}

for _, x := range []struct {
Expand All @@ -69,7 +89,12 @@ func TestIteratorWithTableFilter(t *testing.T) {
{lowerTs: 0, upperTs: 10, expectedCount: 16},
{lowerTs: 10, upperTs: 20, expectedCount: 0},
} {
iter := db.Iterator(encoding.EncodeTsKey(1, 0, 0), encoding.EncodeTsKey(1, 10, 0), x.lowerTs, x.upperTs)
iter := db.Iterator(
encoding.EncodeTsKey(1, 0, 0),
encoding.EncodeTsKey(1, 10, 0),
x.lowerTs,
x.upperTs,
)
require.False(t, iter.Valid())
count := 0
valid := iter.Seek(encoding.EncodeTsKey(1, 0, 0))
Expand Down