Skip to content

Commit

Permalink
*: Fix infoschema/perfschema concurrent bugs (pingcap#1232)
Browse files Browse the repository at this point in the history
* *: Fix infoschema/perfschema concurrent bugs

Make it safe to create Handle on multiple stores.
  • Loading branch information
shenli committed May 16, 2016
1 parent 2a98df9 commit 93577c1
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 87 deletions.
12 changes: 10 additions & 2 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/perfschema"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/localstore"
"github.com/pingcap/tidb/terror"
Expand Down Expand Up @@ -97,6 +98,11 @@ func (do *Domain) InfoSchema() infoschema.InfoSchema {
return do.infoHandle.Get()
}

// PerfSchema gets performance schema from domain.
func (do *Domain) PerfSchema() perfschema.PerfSchema {
return do.infoHandle.GetPerfHandle()
}

// DDL gets DDL from domain.
func (do *Domain) DDL() ddl.DDL {
return do.ddl
Expand Down Expand Up @@ -257,8 +263,10 @@ func NewDomain(store kv.Storage, lease time.Duration) (d *Domain, err error) {
store: store,
leaseCh: make(chan time.Duration, 1),
}

d.infoHandle = infoschema.NewHandle(d.store)
d.infoHandle, err = infoschema.NewHandle(d.store)
if err != nil {
return nil, errors.Trace(err)
}
d.ddl = ddl.NewDDL(d.store, d.infoHandle, &ddlCallback{do: d}, lease)
d.mustReload()

Expand Down
92 changes: 57 additions & 35 deletions infoschema/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,27 +228,45 @@ func (is *infoSchema) Clone() (result []*model.DBInfo) {

// Handle handles information schema, including getting and setting.
type Handle struct {
value atomic.Value
store kv.Storage
value atomic.Value
store kv.Storage
memSchema *memSchemaHandle
}

// NewHandle creates a new Handle.
func NewHandle(store kv.Storage) *Handle {
func NewHandle(store kv.Storage) (*Handle, error) {
h := &Handle{
store: store,
}
// init memory tables
initMemoryTables()
initPerfSchema()
return h
var err error
h.memSchema, err = newMemSchemaHandle()
if err != nil {
return nil, errors.Trace(err)
}
return h, nil
}

func initPerfSchema() {
perfHandle = perfschema.NewPerfHandle()
// Init memory schemas including infoschema and perfshcema.
func newMemSchemaHandle() (*memSchemaHandle, error) {
h := &memSchemaHandle{
nameToTable: make(map[string]table.Table),
}
err := initMemoryTables(h)
if err != nil {
return nil, errors.Trace(err)
}
initMemoryTables(h)
h.perfHandle, err = perfschema.NewPerfHandle()
if err != nil {
return nil, errors.Trace(err)
}
return h, nil
}

var (
// Information_Schema
// memSchemaHandle is used to store memory schema information.
type memSchemaHandle struct {
// Information Schema
isDB *model.DBInfo
schemataTbl table.Table
tablesTbl table.Table
Expand All @@ -261,18 +279,17 @@ var (
profilingTbl table.Table
partitionsTbl table.Table
nameToTable map[string]table.Table

// Performance Schema
perfHandle perfschema.PerfSchema
)
}

func initMemoryTables() error {
func initMemoryTables(h *memSchemaHandle) error {
// Init Information_Schema
var (
err error
tbl table.Table
)
dbID := autoid.GenLocalSchemaID()
nameToTable = make(map[string]table.Table)
isTables := make([]*model.TableInfo, 0, len(tableNameToColumns))
for name, cols := range tableNameToColumns {
meta := buildTableMeta(name, cols)
Expand All @@ -286,26 +303,26 @@ func initMemoryTables() error {
if err != nil {
return errors.Trace(err)
}
nameToTable[meta.Name.L] = tbl
h.nameToTable[meta.Name.L] = tbl
}
schemataTbl = nameToTable[strings.ToLower(tableSchemata)]
tablesTbl = nameToTable[strings.ToLower(tableTables)]
columnsTbl = nameToTable[strings.ToLower(tableColumns)]
statisticsTbl = nameToTable[strings.ToLower(tableStatistics)]
charsetTbl = nameToTable[strings.ToLower(tableCharacterSets)]
collationsTbl = nameToTable[strings.ToLower(tableCollations)]
h.schemataTbl = h.nameToTable[strings.ToLower(tableSchemata)]
h.tablesTbl = h.nameToTable[strings.ToLower(tableTables)]
h.columnsTbl = h.nameToTable[strings.ToLower(tableColumns)]
h.statisticsTbl = h.nameToTable[strings.ToLower(tableStatistics)]
h.charsetTbl = h.nameToTable[strings.ToLower(tableCharacterSets)]
h.collationsTbl = h.nameToTable[strings.ToLower(tableCollations)]

// CharacterSets/Collations contain static data. Init them now.
err = insertData(charsetTbl, dataForCharacterSets())
err = insertData(h.charsetTbl, dataForCharacterSets())
if err != nil {
return errors.Trace(err)
}
err = insertData(collationsTbl, dataForColltions())
err = insertData(h.collationsTbl, dataForColltions())
if err != nil {
return errors.Trace(err)
}
// create db
isDB = &model.DBInfo{
h.isDB = &model.DBInfo{
ID: dbID,
Name: model.NewCIStr(Name),
Charset: mysql.DefaultCharset,
Expand Down Expand Up @@ -386,15 +403,15 @@ func (h *Handle) Set(newInfo []*model.DBInfo, schemaMetaVersion int64) error {
}
}
// Build Information_Schema
info.schemaNameToID[isDB.Name.L] = isDB.ID
info.schemas[isDB.ID] = isDB
for _, t := range isDB.Tables {
tbl, ok := nameToTable[t.Name.L]
info.schemaNameToID[h.memSchema.isDB.Name.L] = h.memSchema.isDB.ID
info.schemas[h.memSchema.isDB.ID] = h.memSchema.isDB
for _, t := range h.memSchema.isDB.Tables {
tbl, ok := h.memSchema.nameToTable[t.Name.L]
if !ok {
return ErrTableNotExists.Gen("table `%s` is missing.", t.Name)
}
info.tables[t.ID] = tbl
tname := tableName{isDB.Name.L, t.Name.L}
tname := tableName{h.memSchema.isDB.Name.L, t.Name.L}
info.tableNameToID[tname] = t.ID
for _, c := range t.Columns {
info.columns[c.ID] = c
Expand All @@ -403,11 +420,11 @@ func (h *Handle) Set(newInfo []*model.DBInfo, schemaMetaVersion int64) error {
}

// Add Performance_Schema
psDB := perfHandle.GetDBMeta()
psDB := h.memSchema.perfHandle.GetDBMeta()
info.schemaNameToID[psDB.Name.L] = psDB.ID
info.schemas[psDB.ID] = psDB
for _, t := range psDB.Tables {
tbl, ok := perfHandle.GetTable(t.Name.O)
tbl, ok := h.memSchema.perfHandle.GetTable(t.Name.O)
if !ok {
return ErrTableNotExists.Gen("table `%s` is missing.", t.Name)
}
Expand All @@ -427,19 +444,19 @@ func (h *Handle) Set(newInfo []*model.DBInfo, schemaMetaVersion int64) error {
dbNames = append(dbNames, v.Name.L)
dbInfos = append(dbInfos, v)
}
err = refillTable(schemataTbl, dataForSchemata(dbNames))
err = refillTable(h.memSchema.schemataTbl, dataForSchemata(dbNames))
if err != nil {
return errors.Trace(err)
}
err = refillTable(tablesTbl, dataForTables(dbInfos))
err = refillTable(h.memSchema.tablesTbl, dataForTables(dbInfos))
if err != nil {
return errors.Trace(err)
}
err = refillTable(columnsTbl, dataForColumns(dbInfos))
err = refillTable(h.memSchema.columnsTbl, dataForColumns(dbInfos))
if err != nil {
return errors.Trace(err)
}
err = refillTable(statisticsTbl, dataForStatistics(dbInfos))
err = refillTable(h.memSchema.statisticsTbl, dataForStatistics(dbInfos))
if err != nil {
return errors.Trace(err)
}
Expand All @@ -454,6 +471,11 @@ func (h *Handle) Get() InfoSchema {
return schema
}

// GetPerfHandle gets performance schema from handle.
func (h *Handle) GetPerfHandle() perfschema.PerfSchema {
return h.memSchema.perfHandle
}

// Schema error codes.
const (
codeDBDropExists terror.ErrCode = 1008
Expand Down
35 changes: 34 additions & 1 deletion infoschema/infoschema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package infoschema_test

import (
"fmt"
"sync"
"testing"

"github.com/juju/errors"
Expand All @@ -26,6 +28,7 @@ import (
"github.com/pingcap/tidb/perfschema"
"github.com/pingcap/tidb/store/localstore"
"github.com/pingcap/tidb/store/localstore/goleveldb"
"github.com/pingcap/tidb/util/testleak"
"github.com/pingcap/tidb/util/testutil"
"github.com/pingcap/tidb/util/types"
)
Expand All @@ -40,12 +43,14 @@ type testSuite struct {
}

func (*testSuite) TestT(c *C) {
defer testleak.AfterTest(c)()
driver := localstore.Driver{Driver: goleveldb.MemoryDriver{}}
store, err := driver.Open("memory")
c.Assert(err, IsNil)
defer store.Close()

handle := infoschema.NewHandle(store)
handle, err := infoschema.NewHandle(store)
c.Assert(err, IsNil)
dbName := model.NewCIStr("Test")
tbName := model.NewCIStr("T")
colName := model.NewCIStr("A")
Expand Down Expand Up @@ -194,6 +199,34 @@ func (*testSuite) TestT(c *C) {
c.Assert(col, NotNil)
}

// Make sure it is safe to concurrently create handle on multiple stores.
func (testSuite) TestConcurrent(c *C) {
defer testleak.AfterTest(c)()
storeCount := 5
stores := make([]kv.Storage, storeCount)
for i := 0; i < storeCount; i++ {
driver := localstore.Driver{Driver: goleveldb.MemoryDriver{}}
store, err := driver.Open(fmt.Sprintf("memory_path_%d", i))
c.Assert(err, IsNil)
stores[i] = store
}
defer func() {
for _, store := range stores {
store.Close()
}
}()
var wg sync.WaitGroup
wg.Add(storeCount)
for _, store := range stores {
go func(s kv.Storage) {
defer wg.Done()
_, err := infoschema.NewHandle(s)
c.Assert(err, IsNil)
}(store)
}
wg.Wait()
}

func genGlobalID(store kv.Storage) (int64, error) {
var globalID int64
err := kv.RunInNewTxn(store, true, func(txn kv.Transaction) error {
Expand Down
24 changes: 8 additions & 16 deletions perfschema/perfschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
package perfschema

import (
"reflect"

"github.com/juju/errors"
"github.com/ngaut/log"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/table"
Expand Down Expand Up @@ -54,29 +55,20 @@ type perfSchema struct {
tables map[string]*model.TableInfo
mTables map[string]table.Table // Memory tables for perfSchema
stmtHandles []int64
stmtInfos map[reflect.Type]*statementInfo
}

var _ PerfSchema = (*perfSchema)(nil)

// PerfHandle is the only access point for the in-memory performance schema information
var (
PerfHandle PerfSchema
)

// NewPerfHandle creates a new perfSchema on store.
func NewPerfHandle() PerfSchema {
schema := PerfHandle.(*perfSchema)
func NewPerfHandle() (PerfSchema, error) {
schema := &perfSchema{}
err := schema.initialize()
if err != nil {
log.Fatal(errors.ErrorStack(err))
return nil, errors.Trace(err)
}
registerStatements()
return PerfHandle
}

func init() {
schema := &perfSchema{}
PerfHandle = schema
schema.registerStatements()
return schema, nil
}

// perfschema error codes.
Expand Down
Loading

0 comments on commit 93577c1

Please sign in to comment.