Skip to content

Commit

Permalink
infoschema: revert #40768 for correctness (#41540)
Browse files Browse the repository at this point in the history
close #41539
  • Loading branch information
xhebox authored Feb 17, 2023
1 parent 69a9dec commit 234b789
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 117 deletions.
79 changes: 23 additions & 56 deletions infoschema/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package infoschema

import (
"fmt"
"sort"
"sync"

Expand All @@ -37,27 +36,22 @@ var (
// It only promised to cache the infoschema, if it is newer than all the cached.
type InfoCache struct {
mu sync.RWMutex
// cache is sorted by both SchemaVersion and timestamp in descending order, assume they have same order
cache []schemaAndTimestamp
}

type schemaAndTimestamp struct {
infoschema InfoSchema
timestamp int64
// cache is sorted by SchemaVersion in descending order
cache []InfoSchema
// record SnapshotTS of the latest schema Insert.
maxUpdatedSnapshotTS uint64
}

// NewCache creates a new InfoCache.
func NewCache(capacity int) *InfoCache {
return &InfoCache{
cache: make([]schemaAndTimestamp, 0, capacity),
}
return &InfoCache{cache: make([]InfoSchema, 0, capacity)}
}

// Reset resets the cache.
func (h *InfoCache) Reset(capacity int) {
h.mu.Lock()
defer h.mu.Unlock()
h.cache = make([]schemaAndTimestamp, 0, capacity)
h.cache = make([]InfoSchema, 0, capacity)
}

// GetLatest gets the newest information schema.
Expand All @@ -67,40 +61,18 @@ func (h *InfoCache) GetLatest() InfoSchema {
getLatestCounter.Inc()
if len(h.cache) > 0 {
hitLatestCounter.Inc()
return h.cache[0].infoschema
return h.cache[0]
}
return nil
}

// GetSchemaByTimestamp returns the schema used at the specific timestamp
func (h *InfoCache) GetSchemaByTimestamp(ts uint64) (InfoSchema, error) {
h.mu.RLock()
defer h.mu.RUnlock()
return h.getSchemaByTimestampNoLock(ts)
}

func (h *InfoCache) getSchemaByTimestampNoLock(ts uint64) (InfoSchema, error) {
i := sort.Search(len(h.cache), func(i int) bool {
return uint64(h.cache[i].timestamp) <= ts
})
if i < len(h.cache) {
return h.cache[i].infoschema, nil
}

return nil, fmt.Errorf("no schema cached for timestamp %d", ts)
}

// GetByVersion gets the information schema based on schemaVersion. Returns nil if it is not loaded.
func (h *InfoCache) GetByVersion(version int64) InfoSchema {
h.mu.RLock()
defer h.mu.RUnlock()
return h.getByVersionNoLock(version)
}

func (h *InfoCache) getByVersionNoLock(version int64) InfoSchema {
getVersionCounter.Inc()
i := sort.Search(len(h.cache), func(i int) bool {
return h.cache[i].infoschema.SchemaMetaVersion() <= version
return h.cache[i].SchemaMetaVersion() <= version
})

// `GetByVersion` is allowed to load the latest schema that is less than argument `version`.
Expand All @@ -121,9 +93,9 @@ func (h *InfoCache) getByVersionNoLock(version int64) InfoSchema {
// }
// ```

if i < len(h.cache) && (i != 0 || h.cache[i].infoschema.SchemaMetaVersion() == version) {
if i < len(h.cache) && (i != 0 || h.cache[i].SchemaMetaVersion() == version) {
hitVersionCounter.Inc()
return h.cache[i].infoschema
return h.cache[i]
}
return nil
}
Expand All @@ -136,9 +108,11 @@ func (h *InfoCache) GetBySnapshotTS(snapshotTS uint64) InfoSchema {
defer h.mu.RUnlock()

getTSCounter.Inc()
if schema, err := h.getSchemaByTimestampNoLock(snapshotTS); err == nil {
hitTSCounter.Inc()
return schema
if snapshotTS >= h.maxUpdatedSnapshotTS {
if len(h.cache) > 0 {
hitTSCounter.Inc()
return h.cache[0]
}
}
return nil
}
Expand All @@ -151,36 +125,29 @@ func (h *InfoCache) Insert(is InfoSchema, snapshotTS uint64) bool {
defer h.mu.Unlock()

version := is.SchemaMetaVersion()

// assume this is the timestamp order as well
i := sort.Search(len(h.cache), func(i int) bool {
return h.cache[i].infoschema.SchemaMetaVersion() <= version
return h.cache[i].SchemaMetaVersion() <= version
})

if h.maxUpdatedSnapshotTS < snapshotTS {
h.maxUpdatedSnapshotTS = snapshotTS
}

// cached entry
if i < len(h.cache) && h.cache[i].infoschema.SchemaMetaVersion() == version {
if h.cache[i].timestamp > int64(snapshotTS) {
h.cache[i].timestamp = int64(snapshotTS)
}
if i < len(h.cache) && h.cache[i].SchemaMetaVersion() == version {
return true
}

if len(h.cache) < cap(h.cache) {
// has free space, grown the slice
h.cache = h.cache[:len(h.cache)+1]
copy(h.cache[i+1:], h.cache[i:])
h.cache[i] = schemaAndTimestamp{
infoschema: is,
timestamp: int64(snapshotTS),
}
h.cache[i] = is
return true
} else if i < len(h.cache) {
// drop older schema
copy(h.cache[i+1:], h.cache[i:])
h.cache[i] = schemaAndTimestamp{
infoschema: is,
timestamp: int64(snapshotTS),
}
h.cache[i] = is
return true
}
// older than all cached schemas, refuse to cache it
Expand Down
64 changes: 3 additions & 61 deletions infoschema/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestInsert(t *testing.T) {
ic.Insert(is5, 5)
require.Equal(t, is5, ic.GetByVersion(5))
require.Equal(t, is2, ic.GetByVersion(2))
require.Equal(t, is2, ic.GetBySnapshotTS(2))
require.Nil(t, ic.GetBySnapshotTS(2))
require.Equal(t, is5, ic.GetBySnapshotTS(10))

// older
Expand All @@ -59,7 +59,7 @@ func TestInsert(t *testing.T) {
require.Equal(t, is5, ic.GetByVersion(5))
require.Equal(t, is2, ic.GetByVersion(2))
require.Nil(t, ic.GetByVersion(0))
require.Equal(t, is2, ic.GetBySnapshotTS(2))
require.Nil(t, ic.GetBySnapshotTS(2))
require.Equal(t, is6, ic.GetBySnapshotTS(10))

// replace 2, drop 2
Expand Down Expand Up @@ -91,7 +91,7 @@ func TestInsert(t *testing.T) {
require.Nil(t, ic.GetByVersion(2))
require.Nil(t, ic.GetByVersion(0))
require.Nil(t, ic.GetBySnapshotTS(2))
require.Equal(t, is5, ic.GetBySnapshotTS(5))
require.Nil(t, ic.GetBySnapshotTS(5))
require.Equal(t, is6, ic.GetBySnapshotTS(10))
}

Expand Down Expand Up @@ -129,61 +129,3 @@ func TestGetLatest(t *testing.T) {
ic.Insert(is0, 0)
require.Equal(t, is2, ic.GetLatest())
}

func TestGetByTimestamp(t *testing.T) {
ic := infoschema.NewCache(16)
require.NotNil(t, ic)
require.Nil(t, ic.GetLatest())

is1 := infoschema.MockInfoSchemaWithSchemaVer(nil, 1)
ic.Insert(is1, 1)
require.Equal(t, is1, ic.GetLatest())
_, err := ic.GetSchemaByTimestamp(0)
require.NotNil(t, err)
schema, err := ic.GetSchemaByTimestamp(1)
require.Nil(t, err)
require.Equal(t, int64(1), schema.SchemaMetaVersion())
require.Equal(t, is1, ic.GetBySnapshotTS(1))
schema, err = ic.GetSchemaByTimestamp(2)
require.Nil(t, err)
require.Equal(t, int64(1), schema.SchemaMetaVersion())
require.Equal(t, is1, ic.GetBySnapshotTS(2))

is2 := infoschema.MockInfoSchemaWithSchemaVer(nil, 2)
ic.Insert(is2, 2)
require.Equal(t, is2, ic.GetLatest())
_, err = ic.GetSchemaByTimestamp(0)
require.NotNil(t, err)
schema, err = ic.GetSchemaByTimestamp(1)
require.Nil(t, err)
require.Equal(t, int64(1), schema.SchemaMetaVersion())
require.Equal(t, is1, ic.GetBySnapshotTS(1))
schema, err = ic.GetSchemaByTimestamp(2)
require.Nil(t, err)
require.Equal(t, int64(2), schema.SchemaMetaVersion())
require.Equal(t, is2, ic.GetBySnapshotTS(2))
schema, err = ic.GetSchemaByTimestamp(3)
require.Nil(t, err)
require.Equal(t, int64(2), schema.SchemaMetaVersion())
require.Equal(t, is2, ic.GetBySnapshotTS(3))

is0 := infoschema.MockInfoSchemaWithSchemaVer(nil, 0)
ic.Insert(is0, 0)
require.Equal(t, is2, ic.GetLatest())
schema, err = ic.GetSchemaByTimestamp(0)
require.Nil(t, err)
require.Equal(t, int64(0), schema.SchemaMetaVersion())
require.Equal(t, is0, ic.GetBySnapshotTS(0))
schema, err = ic.GetSchemaByTimestamp(1)
require.Nil(t, err)
require.Equal(t, int64(1), schema.SchemaMetaVersion())
require.Equal(t, is1, ic.GetBySnapshotTS(1))
schema, err = ic.GetSchemaByTimestamp(2)
require.Nil(t, err)
require.Equal(t, int64(2), schema.SchemaMetaVersion())
require.Equal(t, is2, ic.GetBySnapshotTS(2))
schema, err = ic.GetSchemaByTimestamp(3)
require.Nil(t, err)
require.Equal(t, int64(2), schema.SchemaMetaVersion())
require.Equal(t, is2, ic.GetBySnapshotTS(3))
}

0 comments on commit 234b789

Please sign in to comment.