Skip to content

Commit

Permalink
*: support renew lease for read operation on cached table (#29840)
Browse files Browse the repository at this point in the history
  • Loading branch information
JayLZhou authored Nov 22, 2021
1 parent e47f0f5 commit c48db39
Show file tree
Hide file tree
Showing 10 changed files with 196 additions and 26 deletions.
2 changes: 1 addition & 1 deletion ddl/placement_policy_ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (s *testDDLSuite) TestPlacementPolicyInUse(c *C) {
t4.State = model.StatePublic
db1.Tables = append(db1.Tables, t4)

builder, err := infoschema.NewBuilder(store).InitWithDBInfos(
builder, err := infoschema.NewBuilder(store, nil).InitWithDBInfos(
[]*model.DBInfo{db1, db2, dbP},
nil,
[]*model.PolicyInfo{p1, p2, p3, p4, p5},
Expand Down
31 changes: 24 additions & 7 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ type Domain struct {
serverID uint64
serverIDSession *concurrency.Session
isLostConnectionToPD sync2.AtomicInt32 // !0: true, 0: false.

onClose func()
renewLeaseCh chan func() // It is used to call the renewLease function of the cache table.
onClose func()
}

// loadInfoSchema loads infoschema at startTS.
Expand Down Expand Up @@ -159,7 +159,7 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i
return nil, false, currentSchemaVersion, nil, err
}

newISBuilder, err := infoschema.NewBuilder(do.Store()).InitWithDBInfos(schemas, bundles, policies, neededSchemaVersion)
newISBuilder, err := infoschema.NewBuilder(do.Store(), do.renewLeaseCh).InitWithDBInfos(schemas, bundles, policies, neededSchemaVersion)
if err != nil {
return nil, false, currentSchemaVersion, nil, err
}
Expand Down Expand Up @@ -271,7 +271,7 @@ func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64
}
diffs = append(diffs, diff)
}
builder := infoschema.NewBuilder(do.Store()).InitWithOldInfoSchema(do.infoCache.GetLatest())
builder := infoschema.NewBuilder(do.Store(), do.renewLeaseCh).InitWithOldInfoSchema(do.infoCache.GetLatest())
phyTblIDs := make([]int64, 0, len(diffs))
actions := make([]uint64, 0, len(diffs))
for _, diff := range diffs {
Expand All @@ -287,6 +287,7 @@ func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64
actions = append(actions, uint64(1<<diff.Type))
}
}

is := builder.Build()
relatedChange := transaction.RelatedSchemaChange{}
relatedChange.PhyTblIDS = phyTblIDs
Expand Down Expand Up @@ -406,7 +407,6 @@ func (do *Domain) Reload() error {

// lease renew, so it must be executed despite it is cache or not
do.SchemaValidator.Update(ver.Ver, oldSchemaVersion, is.SchemaMetaVersion(), changes)

lease := do.DDL().GetLease()
sub := time.Since(startTime)
// Reload interval is lease / 2, if load schema time elapses more than this interval,
Expand Down Expand Up @@ -700,6 +700,7 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio
indexUsageSyncLease: idxUsageSyncLease,
planReplayer: &planReplayer{planReplayerGCLease: planReplayerGCLease},
onClose: onClose,
renewLeaseCh: make(chan func(), 10),
}

do.SchemaValidator = NewSchemaValidator(ddlLease, do)
Expand Down Expand Up @@ -824,11 +825,11 @@ func (do *Domain) Init(ddlLease time.Duration, sysFactory func(*Domain) (pools.R
// Local store needs to get the change information for every DDL state in each session.
go do.loadSchemaInLoop(ctx, ddlLease)
}
do.wg.Add(3)
do.wg.Add(4)
go do.topNSlowQueryLoop()
go do.infoSyncerKeeper()
go do.renewLease()
go do.globalConfigSyncerKeeper()

if !skipRegisterToDashboard {
do.wg.Add(1)
go do.topologySyncerKeeper()
Expand Down Expand Up @@ -1729,6 +1730,22 @@ func (do *Domain) MockInfoCacheAndLoadInfoSchema(is infoschema.InfoSchema) {
do.infoCache.Insert(is, 0)
}

func (do *Domain) renewLease() {
defer func() {
do.wg.Done()
logutil.BgLogger().Info("renew lease goroutine exited.")
}()
for {
select {
case <-do.exit:
close(do.renewLeaseCh)
return
case op := <-do.renewLeaseCh:
op()
}
}
}

func init() {
initByLDFlagsForGlobalKill()
}
Expand Down
2 changes: 1 addition & 1 deletion executor/slow_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func parseLog(retriever *slowQueryRetriever, sctx sessionctx.Context, reader *bu
}

func newSlowQueryRetriever() (*slowQueryRetriever, error) {
newISBuilder, err := infoschema.NewBuilder(nil).InitWithDBInfos(nil, nil, nil, 0)
newISBuilder, err := infoschema.NewBuilder(nil, nil).InitWithDBInfos(nil, nil, nil, 0)
if err != nil {
return nil, err
}
Expand Down
23 changes: 20 additions & 3 deletions infoschema/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type Builder struct {
// TODO: store is only used by autoid allocators
// detach allocators from storage, use passed transaction in the feature
store kv.Storage
// TODO: renewLeaseCh is only used to pass data between table and domain
renewLeaseCh chan func()
}

// ApplyDiff applies SchemaDiff to the new InfoSchema.
Expand Down Expand Up @@ -438,7 +440,7 @@ func (b *Builder) applyCreateTable(m *meta.Meta, dbInfo *model.DBInfo, tableID i
}
}
}
tbl, err := tables.TableFromMeta(allocs, tblInfo)
tbl, err := b.tableFromMeta(allocs, tblInfo)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -601,7 +603,7 @@ func (b *Builder) InitWithDBInfos(dbInfos []*model.DBInfo, bundles []*placement.
}

for _, di := range dbInfos {
err := b.createSchemaTablesForDB(di, tables.TableFromMeta)
err := b.createSchemaTablesForDB(di, b.tableFromMeta)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -622,6 +624,20 @@ func (b *Builder) InitWithDBInfos(dbInfos []*model.DBInfo, bundles []*placement.
return b, nil
}

func (b *Builder) tableFromMeta(alloc autoid.Allocators, tblInfo *model.TableInfo) (table.Table, error) {
ret, err := tables.TableFromMeta(alloc, tblInfo)
if err != nil {
return nil, errors.Trace(err)
}
if t, ok := ret.(table.CachedTable); ok {
err = t.Init(b.renewLeaseCh)
if err != nil {
return nil, errors.Trace(err)
}
}
return ret, nil
}

type tableFromMetaFunc func(alloc autoid.Allocators, tblInfo *model.TableInfo) (table.Table, error)

func (b *Builder) createSchemaTablesForDB(di *model.DBInfo, tableFromMeta tableFromMetaFunc) error {
Expand Down Expand Up @@ -658,7 +674,7 @@ func RegisterVirtualTable(dbInfo *model.DBInfo, tableFromMeta tableFromMetaFunc)
}

// NewBuilder creates a new Builder with a Handle.
func NewBuilder(store kv.Storage) *Builder {
func NewBuilder(store kv.Storage, renewCh chan func()) *Builder {
return &Builder{
store: store,
is: &infoSchema{
Expand All @@ -667,6 +683,7 @@ func NewBuilder(store kv.Storage) *Builder {
ruleBundleMap: map[string]*placement.Bundle{},
sortedTablesBuckets: make([]sortedTables, bucketCount),
},
renewLeaseCh: renewCh,
}
}

Expand Down
6 changes: 3 additions & 3 deletions infoschema/infoschema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestBasic(t *testing.T) {
})
require.NoError(t, err)

builder, err := infoschema.NewBuilder(dom.Store()).InitWithDBInfos(dbInfos, nil, nil, 1)
builder, err := infoschema.NewBuilder(dom.Store(), nil).InitWithDBInfos(dbInfos, nil, nil, 1)
require.NoError(t, err)

txn, err := store.Begin()
Expand Down Expand Up @@ -259,7 +259,7 @@ func TestInfoTables(t *testing.T) {
require.NoError(t, err)
}()

builder, err := infoschema.NewBuilder(store).InitWithDBInfos(nil, nil, nil, 0)
builder, err := infoschema.NewBuilder(store, nil).InitWithDBInfos(nil, nil, nil, 0)
require.NoError(t, err)
is := builder.Build()

Expand Down Expand Up @@ -326,7 +326,7 @@ func TestGetBundle(t *testing.T) {
require.NoError(t, err)
}()

builder, err := infoschema.NewBuilder(store).InitWithDBInfos(nil, nil, nil, 0)
builder, err := infoschema.NewBuilder(store, nil).InitWithDBInfos(nil, nil, nil, 0)
require.NoError(t, err)
is := builder.Build()

Expand Down
2 changes: 2 additions & 0 deletions table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,8 @@ var MockTableFromMeta func(tableInfo *model.TableInfo) Table
type CachedTable interface {
Table

Init(renewCh chan func()) error

// TryReadFromCache checks if the cache table is readable.
TryReadFromCache(ts uint64) kv.MemBuffer

Expand Down
61 changes: 54 additions & 7 deletions table/tables/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/table"
Expand All @@ -29,6 +30,16 @@ import (
"github.com/tikv/client-go/v2/tikv"
)

// RenewLeaseType define the type for renew lease.
type RenewLeaseType int

const (
// RenewReadLease means renew read lease.
RenewReadLease RenewLeaseType = iota + 1
// RenewWriteLease means renew write lease.
RenewWriteLease
)

var (
_ table.Table = &cachedTable{}
_ table.CachedTable = &cachedTable{}
Expand All @@ -38,6 +49,7 @@ type cachedTable struct {
TableCommon
cacheData atomic.Value
handle StateRemote
renewCh chan func()
}

// cacheData pack the cache data and lease.
Expand Down Expand Up @@ -72,31 +84,48 @@ func (c *cachedTable) TryReadFromCache(ts uint64) kv.MemBuffer {
}
data := tmp.(*cacheData)
if ts >= data.Start && ts < data.Lease {
leaseTime := oracle.GetTimeFromTS(data.Lease)
nowTime := oracle.GetTimeFromTS(ts)
distance := leaseTime.Sub(nowTime)
// TODO make this configurable in the following PRs
if distance >= 0 && distance <= (1*time.Second) {
c.renewCh <- c.renewLease(ts, RenewReadLease, data)
}
return data
}
return nil
}

var mockStateRemote = struct {
// MockStateRemote represents the information of stateRemote.
// Exported it only for testing.
var MockStateRemote = struct {
Ch chan remoteTask
Data *mockStateRemoteData
}{}

// NewCachedTable creates a new CachedTable Instance
func NewCachedTable(tbl *TableCommon) (table.Table, error) {
if mockStateRemote.Data == nil {
mockStateRemote.Data = newMockStateRemoteData()
mockStateRemote.Ch = make(chan remoteTask, 100)
go mockRemoteService(mockStateRemote.Data, mockStateRemote.Ch)
if MockStateRemote.Data == nil {
MockStateRemote.Data = newMockStateRemoteData()
MockStateRemote.Ch = make(chan remoteTask, 100)
go mockRemoteService(MockStateRemote.Data, MockStateRemote.Ch)
}

ret := &cachedTable{
TableCommon: *tbl,
handle: &mockStateRemoteHandle{mockStateRemote.Ch},
handle: &mockStateRemoteHandle{MockStateRemote.Ch},
renewCh: make(chan func()),
}

return ret, nil
}

// Init is an extra operation for cachedTable after TableFromMeta,
// Because cachedTable need some additional parameter that can't be passed in TableFromMeta.
func (c *cachedTable) Init(renewCh chan func()) error {
c.renewCh = renewCh
return nil
}

func (c *cachedTable) loadDataFromOriginalTable(store kv.Storage, lease uint64) (kv.MemBuffer, uint64, error) {
buffer, err := newMemBuffer(store)
if err != nil {
Expand Down Expand Up @@ -203,3 +232,21 @@ func (c *cachedTable) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []type
}
return c.TableCommon.RemoveRecord(ctx, h, r)
}

func (c *cachedTable) renewLease(ts uint64, op RenewLeaseType, data *cacheData) func() {
return func() {
tid := c.Meta().ID
lease := leaseFromTS(ts)
succ, err := c.handle.RenewLease(tid, ts, lease, op)
if err != nil {
log.Warn("Renew read lease error")
}
if succ {
c.cacheData.Store(&cacheData{
Start: data.Start,
Lease: lease,
MemBuffer: data,
})
}
}
}
30 changes: 29 additions & 1 deletion table/tables/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (
"testing"
"time"

"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/testkit"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -358,7 +361,7 @@ func TestCacheTableBatchPointGet(t *testing.T) {
tk.MustExec("insert into bp_cache_tmp1 values(2, 12, 102)")
tk.MustExec("insert into bp_cache_tmp1 values(3, 13, 103)")
tk.MustExec("insert into bp_cache_tmp1 values(4, 14, 104)")

tk.MustExec("alter table bp_cache_tmp1 cache")
// check point get out transaction
tk.MustQuery("select * from bp_cache_tmp1 where id in (1, 3)").Check(testkit.Rows("1 11 101", "3 13 103"))
tk.MustQuery("select * from bp_cache_tmp1 where u in (11, 13)").Check(testkit.Rows("1 11 101", "3 13 103"))
Expand Down Expand Up @@ -389,3 +392,28 @@ func TestCacheTableBatchPointGet(t *testing.T) {
tk.MustQuery("select * from bp_cache_tmp1 where id in (1, 4)").Check(testkit.Rows("1 11 101"))
tk.MustQuery("select * from bp_cache_tmp1 where u in (11, 14)").Check(testkit.Rows("1 11 101"))
}

func TestRenewLease(t *testing.T) {
// Test RenewLeaseForRead
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
se := tk.Session()
tk.MustExec("create table cache_renew_t (id int)")
tk.MustExec("alter table cache_renew_t cache")
tbl, err := se.GetInfoSchema().(infoschema.InfoSchema).TableByName(model.NewCIStr("test"), model.NewCIStr("cache_renew_t"))
require.NoError(t, err)
var i int
tk.MustExec("select * from cache_renew_t")
_, oldLease, _ := tables.MockStateRemote.Data.Load(tbl.Meta().ID)
for i = 0; i < 20; i++ {
time.Sleep(200 * time.Millisecond)
tk.MustExec("select * from cache_renew_t")
_, lease, _ := tables.MockStateRemote.Data.Load(tbl.Meta().ID)
if lease != oldLease {
break
}
}
require.True(t, i < 20)
}
Loading

0 comments on commit c48db39

Please sign in to comment.