Skip to content

Commit

Permalink
table: Add CachedTableSupport and TemporaryTableSupport for `Muta…
Browse files Browse the repository at this point in the history
…teContext` (#54900)

close #54397
  • Loading branch information
lcwangchao authored Jul 26, 2024
1 parent 0353655 commit 2ee8c99
Show file tree
Hide file tree
Showing 7 changed files with 202 additions and 54 deletions.
1 change: 1 addition & 0 deletions pkg/table/context/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
"//pkg/expression/context",
"//pkg/infoschema/context",
"//pkg/kv",
"//pkg/meta/autoid",
"//pkg/parser/model",
"//pkg/sessionctx/stmtctx",
"//pkg/sessionctx/variable",
Expand Down
72 changes: 66 additions & 6 deletions pkg/table/context/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
exprctx "github.com/pingcap/tidb/pkg/expression/context"
infoschema "github.com/pingcap/tidb/pkg/infoschema/context"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/autoid"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
Expand Down Expand Up @@ -48,6 +49,61 @@ type StatisticsSupport interface {
UpdatePhysicalTableDelta(physicalTableID int64, delta int64, count int64, cols variable.DeltaCols)
}

// CachedTableSupport is used for cached table operations
type CachedTableSupport interface {
// AddCachedTableHandleToTxn adds a cached handle to the current transaction
// to handle cached table when committing txn.
// The handle argument should implement `table.CachedTable` interface, but here is `any` to avoid import cycle.
AddCachedTableHandleToTxn(tableID int64, handle any)
}

// TemporaryTableHandler is used by `table.Table` to handle temporary table.
type TemporaryTableHandler struct {
tblInTxn tableutil.TempTable
data variable.TemporaryTableData
}

// NewTemporaryTableHandler creates a new TemporaryTableHandler
func NewTemporaryTableHandler(tbl tableutil.TempTable, data variable.TemporaryTableData) TemporaryTableHandler {
return TemporaryTableHandler{
tblInTxn: tbl,
data: data,
}
}

// Meta returns the meta
func (h *TemporaryTableHandler) Meta() *model.TableInfo {
return h.tblInTxn.GetMeta()
}

// GetDirtySize returns the size of dirty data in txn of the temporary table
func (h *TemporaryTableHandler) GetDirtySize() int64 {
return h.tblInTxn.GetSize()
}

// GetCommittedSize returns the committed data size of the temporary table
func (h *TemporaryTableHandler) GetCommittedSize() int64 {
if h.data == nil {
return 0
}
return h.data.GetTableSize(h.tblInTxn.GetMeta().ID)
}

// UpdateTxnDeltaSize updates the size of dirty data statistics in txn of the temporary table
func (h *TemporaryTableHandler) UpdateTxnDeltaSize(delta int) {
h.tblInTxn.SetSize(h.tblInTxn.GetSize() + int64(delta))
}

// TemporaryTableSupport is used for temporary table operations
type TemporaryTableSupport interface {
// GetTemporaryTableSizeLimit returns the size limit of a temporary table.
GetTemporaryTableSizeLimit() int64
// AddTemporaryTableToTxn adds a temporary table to txn to mark it is modified
// and txn will handle it when committing.
// It returns a `TemporaryTableHandler` object which provides some extra info for the temporary table.
AddTemporaryTableToTxn(tblInfo *model.TableInfo) (TemporaryTableHandler, bool)
}

// MutateContext is used to when mutating a table.
type MutateContext interface {
AllocatorContext
Expand All @@ -62,9 +118,6 @@ type MutateContext interface {
Txn(active bool) (kv.Transaction, error)
// GetDomainInfoSchema returns the latest information schema in domain
GetDomainInfoSchema() infoschema.MetaOnlyInfoSchema
// TxnRecordTempTable record the temporary table to the current transaction.
// This method will be called when the temporary table is modified or should allocate id in the transaction.
TxnRecordTempTable(tbl *model.TableInfo) tableutil.TempTable
// ConnectionID returns the id of the current connection.
// If the current environment is not in a query from the client, the return value is 0.
ConnectionID() uint64
Expand All @@ -90,11 +143,18 @@ type MutateContext interface {
// GetStatisticsSupport returns a `StatisticsSupport` if the context supports it.
// If the context does not support statistics update, the second return value will be false.
GetStatisticsSupport() (StatisticsSupport, bool)
// GetCachedTableSupport returns a `CachedTableSupport` if the context supports it.
// If the context does not support cached table, the second return value will be false.
GetCachedTableSupport() (CachedTableSupport, bool)
// GetTemporaryTableSupport returns a `TemporaryTableSupport` if the context supports it.
// If the context does not support temporary table, the second return value will be false.
GetTemporaryTableSupport() (TemporaryTableSupport, bool)
}

// AllocatorContext is used to provide context for method `table.Allocators`.
type AllocatorContext interface {
// TxnRecordTempTable record the temporary table to the current transaction.
// This method will be called when the temporary table is modified or should allocate id in the transaction.
TxnRecordTempTable(tbl *model.TableInfo) tableutil.TempTable
// AlternativeAllocators returns an alternative `autoid.Allocators` for the table.
// If the second return value is nil, it means there are no alternative allocators in the context.
// Currently, it provides alternative allocators for temporary tables to alloc IDs in session.
AlternativeAllocators(tbl *model.TableInfo) (autoid.Allocators, bool)
}
4 changes: 3 additions & 1 deletion pkg/table/contextimpl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/expression/context",
"//pkg/meta/autoid",
"//pkg/parser/model",
"//pkg/sessionctx",
"//pkg/sessionctx/stmtctx",
"//pkg/sessionctx/variable",
"//pkg/table/context",
"//pkg/util/intest",
"//pkg/util/tableutil",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_tipb//go-binlog",
],
Expand All @@ -26,8 +26,10 @@ go_test(
flaky = True,
deps = [
":contextimpl",
"//pkg/parser/model",
"//pkg/sessionctx/binloginfo",
"//pkg/sessionctx/variable",
"//pkg/table",
"//pkg/testkit",
"//pkg/util/mock",
"@com_github_pingcap_tipb//go-binlog",
Expand Down
60 changes: 55 additions & 5 deletions pkg/table/contextimpl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ package contextimpl
import (
"github.com/pingcap/failpoint"
exprctx "github.com/pingcap/tidb/pkg/expression/context"
"github.com/pingcap/tidb/pkg/meta/autoid"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table/context"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/tableutil"
"github.com/pingcap/tipb/go-binlog"
)

Expand All @@ -47,10 +47,18 @@ func NewTableContextImpl(sctx sessionctx.Context) *TableContextImpl {
}
}

// TxnRecordTempTable record the temporary table to the current transaction.
// This method will be called when the temporary table is modified in the transaction.
func (ctx *TableContextImpl) TxnRecordTempTable(tbl *model.TableInfo) tableutil.TempTable {
return ctx.vars().GetTemporaryTable(tbl)
// AlternativeAllocators implements the AllocatorContext interface
func (ctx *TableContextImpl) AlternativeAllocators(tbl *model.TableInfo) (allocators autoid.Allocators, ok bool) {
// Use an independent allocator for global temporary tables.
if tbl.TempTableType == model.TempTableGlobal {
if tempTbl := ctx.vars().GetTemporaryTable(tbl); tempTbl != nil {
if alloc := tempTbl.GetAutoIDAllocator(); alloc != nil {
return autoid.NewAllocators(false, alloc), true
}
}
// If the session is not in a txn, for example, in "show create table", use the original allocator.
}
return
}

// GetExprCtx returns the ExprContext
Expand Down Expand Up @@ -145,6 +153,48 @@ func (ctx *TableContextImpl) UpdatePhysicalTableDelta(
}
}

// GetCachedTableSupport implements the MutateContext interface.
func (ctx *TableContextImpl) GetCachedTableSupport() (context.CachedTableSupport, bool) {
if ctx.vars().TxnCtx != nil {
return ctx, true
}
return nil, false
}

// AddCachedTableHandleToTxn implements `CachedTableSupport` interface
func (ctx *TableContextImpl) AddCachedTableHandleToTxn(tableID int64, handle any) {
txnCtx := ctx.vars().TxnCtx
if txnCtx.CachedTables == nil {
txnCtx.CachedTables = make(map[int64]any)
}
if _, ok := txnCtx.CachedTables[tableID]; !ok {
txnCtx.CachedTables[tableID] = handle
}
}

// GetTemporaryTableSupport implements the MutateContext interface.
func (ctx *TableContextImpl) GetTemporaryTableSupport() (context.TemporaryTableSupport, bool) {
if ctx.vars().TxnCtx == nil {
return nil, false
}
return ctx, true
}

// GetTemporaryTableSizeLimit implements TemporaryTableSupport interface.
func (ctx *TableContextImpl) GetTemporaryTableSizeLimit() int64 {
return ctx.vars().TMPTableSize
}

// AddTemporaryTableToTxn implements the TemporaryTableSupport interface.
func (ctx *TableContextImpl) AddTemporaryTableToTxn(tblInfo *model.TableInfo) (context.TemporaryTableHandler, bool) {
vars := ctx.vars()
if tbl := vars.GetTemporaryTable(tblInfo); tbl != nil {
tbl.SetModified(true)
return context.NewTemporaryTableHandler(tbl, vars.TemporaryTableData), true
}
return context.TemporaryTableHandler{}, false
}

func (ctx *TableContextImpl) vars() *variable.SessionVars {
return ctx.Context.GetSessionVars()
}
55 changes: 55 additions & 0 deletions pkg/table/contextimpl/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,26 @@ package contextimpl_test
import (
"testing"

"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx/binloginfo"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/table/contextimpl"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/util/mock"
"github.com/pingcap/tipb/go-binlog"
"github.com/stretchr/testify/require"
)

type mockTemporaryData struct {
variable.TemporaryTableData
size int64
}

func (m *mockTemporaryData) GetTableSize(tableID int64) int64 {
return tableID*1000000 + m.size
}

func TestMutateContextImplFields(t *testing.T) {
sctx := mock.NewContext()
sctx.Mutations = make(map[int64]*binlog.TableMutation)
Expand Down Expand Up @@ -114,4 +125,48 @@ func TestMutateContextImplFields(t *testing.T) {
require.Equal(t, int64(1), deltaMap.Delta)
require.Equal(t, int64(2), deltaMap.Count)
require.Equal(t, map[int64]int64{3: 4, 5: 6}, deltaMap.ColSize)
// cached table support
sctx.GetSessionVars().TxnCtx = nil
cachedTableSupport, ok := ctx.GetCachedTableSupport()
require.False(t, ok)
require.Nil(t, cachedTableSupport)
sctx.GetSessionVars().TxnCtx = txnCtx
cachedTableSupport, ok = ctx.GetCachedTableSupport()
require.True(t, ok)
type mockCachedTable struct {
table.CachedTable
}
handle := &mockCachedTable{}
require.Nil(t, sctx.GetSessionVars().TxnCtx.CachedTables[123])
cachedTableSupport.AddCachedTableHandleToTxn(123, handle)
cached := sctx.GetSessionVars().TxnCtx.CachedTables[123]
require.Same(t, handle, cached)
// temporary table support
sctx.GetSessionVars().TxnCtx = nil
tempTableSupport, ok := ctx.GetTemporaryTableSupport()
require.False(t, ok)
require.Nil(t, tempTableSupport)
sctx.GetSessionVars().TxnCtx = txnCtx
mockTempData := &mockTemporaryData{}
sctx.GetSessionVars().TemporaryTableData = mockTempData
tempTableSupport, ok = ctx.GetTemporaryTableSupport()
require.True(t, ok)
require.Nil(t, txnCtx.TemporaryTables[456])
tmpTblHandler, ok := tempTableSupport.AddTemporaryTableToTxn(&model.TableInfo{
ID: 456,
TempTableType: model.TempTableGlobal,
})
require.True(t, ok)
require.NotNil(t, tmpTblHandler)
tmpTblTable := txnCtx.TemporaryTables[456]
require.NotNil(t, tmpTblTable)
require.True(t, tmpTblTable.GetModified())
require.Equal(t, int64(456000000), tmpTblHandler.GetCommittedSize())
mockTempData.size = 111
require.Equal(t, int64(456000111), tmpTblHandler.GetCommittedSize())
require.Equal(t, int64(0), tmpTblHandler.GetDirtySize())
tmpTblHandler.UpdateTxnDeltaSize(333)
require.Equal(t, int64(333), tmpTblHandler.GetDirtySize())
tmpTblHandler.UpdateTxnDeltaSize(-1)
require.Equal(t, int64(332), tmpTblHandler.GetDirtySize())
}
8 changes: 2 additions & 6 deletions pkg/table/tables/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,12 +248,8 @@ func (c *cachedTable) AddRecord(sctx table.MutateContext, r []types.Datum, opts
}

func txnCtxAddCachedTable(sctx table.MutateContext, tid int64, handle *cachedTable) {
txnCtx := sctx.GetSessionVars().TxnCtx
if txnCtx.CachedTables == nil {
txnCtx.CachedTables = make(map[int64]any)
}
if _, ok := txnCtx.CachedTables[tid]; !ok {
txnCtx.CachedTables[tid] = handle
if s, ok := sctx.GetCachedTableSupport(); ok {
s.AddCachedTableHandleToTxn(tid, handle)
}
}

Expand Down
Loading

0 comments on commit 2ee8c99

Please sign in to comment.