Skip to content

Commit

Permalink
cache schema version by timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
hihihuhu committed Jan 25, 2023
1 parent 8ec2612 commit 5982d7c
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 10 deletions.
69 changes: 59 additions & 10 deletions infoschema/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package infoschema

import (
"fmt"
"sort"
"sync"

Expand All @@ -38,20 +39,32 @@ type InfoCache struct {
mu sync.RWMutex
// cache is sorted by SchemaVersion in descending order
cache []InfoSchema
// record SnapshotTS of the latest schema Insert.
maxUpdatedSnapshotTS uint64

// keep ts to version mapping
tsToVersion map[uint64]int64
versionToTs map[int64]uint64
// maintain the order everytime the cahce is updated
descSortedTs []uint64
}

// NewCache creates a new InfoCache.
func NewCache(capacity int) *InfoCache {
return &InfoCache{cache: make([]InfoSchema, 0, capacity)}
return &InfoCache{
cache: make([]InfoSchema, 0, capacity),
tsToVersion: make(map[uint64]int64),
versionToTs: make(map[int64]uint64),
descSortedTs: make([]uint64, 0, capacity),
}
}

// Reset resets the cache.
func (h *InfoCache) Reset(capacity int) {
h.mu.Lock()
defer h.mu.Unlock()
h.cache = make([]InfoSchema, 0, capacity)
h.tsToVersion = make(map[uint64]int64)
h.versionToTs = make(map[int64]uint64)
h.descSortedTs = make([]uint64, 0, capacity)
}

// GetLatest gets the newest information schema.
Expand All @@ -66,10 +79,26 @@ func (h *InfoCache) GetLatest() InfoSchema {
return nil
}

// GetVersionByTimestamp returns the schema version used at the specific timestamp
func (h *InfoCache) GetVersionByTimestamp(ts uint64) (int64, error) {
h.mu.RLock()
defer h.mu.RUnlock()

i := sort.Search(len(h.descSortedTs), func(i int) bool {
return h.descSortedTs[i] <= ts
})
if i < len(h.descSortedTs) {
return h.tsToVersion[h.descSortedTs[i]], nil
}

return 0, fmt.Errorf("no schema cached for timestamp %d", ts)
}

// GetByVersion gets the information schema based on schemaVersion. Returns nil if it is not loaded.
func (h *InfoCache) GetByVersion(version int64) InfoSchema {
h.mu.RLock()
defer h.mu.RUnlock()

getVersionCounter.Inc()
i := sort.Search(len(h.cache), func(i int) bool {
return h.cache[i].SchemaMetaVersion() <= version
Expand Down Expand Up @@ -108,11 +137,9 @@ func (h *InfoCache) GetBySnapshotTS(snapshotTS uint64) InfoSchema {
defer h.mu.RUnlock()

getTSCounter.Inc()
if snapshotTS >= h.maxUpdatedSnapshotTS {
if len(h.cache) > 0 {
hitTSCounter.Inc()
return h.cache[0]
}
if version, err := h.GetVersionByTimestamp(snapshotTS); err == nil {
hitTSCounter.Inc()
return h.GetByVersion(version)
}
return nil
}
Expand All @@ -129,9 +156,31 @@ func (h *InfoCache) Insert(is InfoSchema, snapshotTS uint64) bool {
return h.cache[i].SchemaMetaVersion() <= version
})

if h.maxUpdatedSnapshotTS < snapshotTS {
h.maxUpdatedSnapshotTS = snapshotTS
// maintain ts to version mapping
ts, ok := h.versionToTs[version]
if ok {
// version exists, only update ts if the new one is smaller
if snapshotTS < ts {
h.versionToTs[version] = snapshotTS
delete(h.tsToVersion, ts)
h.tsToVersion[snapshotTS] = version
pos := sort.Search(len(h.descSortedTs), func(i int) bool {
return h.descSortedTs[i] < ts
})
if pos > 0 {
h.descSortedTs[pos-1] = snapshotTS
}
}
} else {
// add the version
h.versionToTs[version] = snapshotTS
h.tsToVersion[snapshotTS] = version
h.descSortedTs = append(h.descSortedTs, snapshotTS)
}
sort.Slice(h.descSortedTs, func(i, j int) bool {
// reverse the order
return h.descSortedTs[i] > h.descSortedTs[j]
})

// cached entry
if i < len(h.cache) && h.cache[i].SchemaMetaVersion() == version {
Expand Down
58 changes: 58 additions & 0 deletions infoschema/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,61 @@ func TestGetLatest(t *testing.T) {
ic.Insert(is0, 0)
require.Equal(t, is2, ic.GetLatest())
}

func TestGetByTimestamp(t *testing.T) {
ic := infoschema.NewCache(16)
require.NotNil(t, ic)
require.Nil(t, ic.GetLatest())

is1 := infoschema.MockInfoSchemaWithSchemaVer(nil, 1)
ic.Insert(is1, 1)
require.Equal(t, is1, ic.GetLatest())
_, err := ic.GetVersionByTimestamp(0)
require.NotNil(t, err)
ver, err := ic.GetVersionByTimestamp(1)
require.Nil(t, err)
require.Equal(t, int64(1), ver)
require.Equal(t, is1, ic.GetBySnapshotTS(1))
ver, err = ic.GetVersionByTimestamp(2)
require.Nil(t, err)
require.Equal(t, int64(1), ver)
require.Equal(t, is1, ic.GetBySnapshotTS(2))

is2 := infoschema.MockInfoSchemaWithSchemaVer(nil, 2)
ic.Insert(is2, 2)
require.Equal(t, is2, ic.GetLatest())
_, err = ic.GetVersionByTimestamp(0)
require.NotNil(t, err)
ver, err = ic.GetVersionByTimestamp(1)
require.Nil(t, err)
require.Equal(t, int64(1), ver)
require.Equal(t, is1, ic.GetBySnapshotTS(1))
ver, err = ic.GetVersionByTimestamp(2)
require.Nil(t, err)
require.Equal(t, int64(2), ver)
require.Equal(t, is2, ic.GetBySnapshotTS(2))
ver, err = ic.GetVersionByTimestamp(3)
require.Nil(t, err)
require.Equal(t, int64(2), ver)
require.Equal(t, is2, ic.GetBySnapshotTS(3))

is0 := infoschema.MockInfoSchemaWithSchemaVer(nil, 0)
ic.Insert(is0, 0)
require.Equal(t, is2, ic.GetLatest())
ver, err = ic.GetVersionByTimestamp(0)
require.Nil(t, err)
require.Equal(t, int64(0), ver)
require.Equal(t, is0, ic.GetBySnapshotTS(0))
ver, err = ic.GetVersionByTimestamp(1)
require.Nil(t, err)
require.Equal(t, int64(1), ver)
require.Equal(t, is1, ic.GetBySnapshotTS(1))
ver, err = ic.GetVersionByTimestamp(2)
require.Nil(t, err)
require.Equal(t, int64(2), ver)
require.Equal(t, is2, ic.GetBySnapshotTS(2))
ver, err = ic.GetVersionByTimestamp(3)
require.Nil(t, err)
require.Equal(t, int64(2), ver)
require.Equal(t, is2, ic.GetBySnapshotTS(3))
}

0 comments on commit 5982d7c

Please sign in to comment.