diff --git a/infoschema/cache.go b/infoschema/cache.go index 34cc08eca2231..af5975ab95efc 100644 --- a/infoschema/cache.go +++ b/infoschema/cache.go @@ -15,6 +15,7 @@ package infoschema import ( + "fmt" "sort" "sync" @@ -38,13 +39,22 @@ type InfoCache struct { mu sync.RWMutex // cache is sorted by SchemaVersion in descending order cache []InfoSchema - // record SnapshotTS of the latest schema Insert. - maxUpdatedSnapshotTS uint64 + + // keep ts to version mapping + tsToVersion map[uint64]int64 + versionToTs map[int64]uint64 + // maintain the order everytime the cahce is updated + descSortedTs []uint64 } // NewCache creates a new InfoCache. func NewCache(capacity int) *InfoCache { - return &InfoCache{cache: make([]InfoSchema, 0, capacity)} + return &InfoCache{ + cache: make([]InfoSchema, 0, capacity), + tsToVersion: make(map[uint64]int64), + versionToTs: make(map[int64]uint64), + descSortedTs: make([]uint64, 0, capacity), + } } // Reset resets the cache. @@ -52,6 +62,9 @@ func (h *InfoCache) Reset(capacity int) { h.mu.Lock() defer h.mu.Unlock() h.cache = make([]InfoSchema, 0, capacity) + h.tsToVersion = make(map[uint64]int64) + h.versionToTs = make(map[int64]uint64) + h.descSortedTs = make([]uint64, 0, capacity) } // GetLatest gets the newest information schema. @@ -66,10 +79,26 @@ func (h *InfoCache) GetLatest() InfoSchema { return nil } +// GetVersionByTimestamp returns the schema version used at the specific timestamp +func (h *InfoCache) GetVersionByTimestamp(ts uint64) (int64, error) { + h.mu.RLock() + defer h.mu.RUnlock() + + i := sort.Search(len(h.descSortedTs), func(i int) bool { + return h.descSortedTs[i] <= ts + }) + if i < len(h.descSortedTs) { + return h.tsToVersion[h.descSortedTs[i]], nil + } + + return 0, fmt.Errorf("no schema cached for timestamp %d", ts) +} + // GetByVersion gets the information schema based on schemaVersion. Returns nil if it is not loaded. func (h *InfoCache) GetByVersion(version int64) InfoSchema { h.mu.RLock() defer h.mu.RUnlock() + getVersionCounter.Inc() i := sort.Search(len(h.cache), func(i int) bool { return h.cache[i].SchemaMetaVersion() <= version @@ -108,11 +137,9 @@ func (h *InfoCache) GetBySnapshotTS(snapshotTS uint64) InfoSchema { defer h.mu.RUnlock() getTSCounter.Inc() - if snapshotTS >= h.maxUpdatedSnapshotTS { - if len(h.cache) > 0 { - hitTSCounter.Inc() - return h.cache[0] - } + if version, err := h.GetVersionByTimestamp(snapshotTS); err == nil { + hitTSCounter.Inc() + return h.GetByVersion(version) } return nil } @@ -129,9 +156,31 @@ func (h *InfoCache) Insert(is InfoSchema, snapshotTS uint64) bool { return h.cache[i].SchemaMetaVersion() <= version }) - if h.maxUpdatedSnapshotTS < snapshotTS { - h.maxUpdatedSnapshotTS = snapshotTS + // maintain ts to version mapping + ts, ok := h.versionToTs[version] + if ok { + // version exists, only update ts if the new one is smaller + if snapshotTS < ts { + h.versionToTs[version] = snapshotTS + delete(h.tsToVersion, ts) + h.tsToVersion[snapshotTS] = version + pos := sort.Search(len(h.descSortedTs), func(i int) bool { + return h.descSortedTs[i] < ts + }) + if pos > 0 { + h.descSortedTs[pos-1] = snapshotTS + } + } + } else { + // add the version + h.versionToTs[version] = snapshotTS + h.tsToVersion[snapshotTS] = version + h.descSortedTs = append(h.descSortedTs, snapshotTS) } + sort.Slice(h.descSortedTs, func(i, j int) bool { + // reverse the order + return h.descSortedTs[i] > h.descSortedTs[j] + }) // cached entry if i < len(h.cache) && h.cache[i].SchemaMetaVersion() == version { diff --git a/infoschema/cache_test.go b/infoschema/cache_test.go index 83506bc4794d8..46b41722df0f9 100644 --- a/infoschema/cache_test.go +++ b/infoschema/cache_test.go @@ -129,3 +129,61 @@ func TestGetLatest(t *testing.T) { ic.Insert(is0, 0) require.Equal(t, is2, ic.GetLatest()) } + +func TestGetByTimestamp(t *testing.T) { + ic := infoschema.NewCache(16) + require.NotNil(t, ic) + require.Nil(t, ic.GetLatest()) + + is1 := infoschema.MockInfoSchemaWithSchemaVer(nil, 1) + ic.Insert(is1, 1) + require.Equal(t, is1, ic.GetLatest()) + _, err := ic.GetVersionByTimestamp(0) + require.NotNil(t, err) + ver, err := ic.GetVersionByTimestamp(1) + require.Nil(t, err) + require.Equal(t, int64(1), ver) + require.Equal(t, is1, ic.GetBySnapshotTS(1)) + ver, err = ic.GetVersionByTimestamp(2) + require.Nil(t, err) + require.Equal(t, int64(1), ver) + require.Equal(t, is1, ic.GetBySnapshotTS(2)) + + is2 := infoschema.MockInfoSchemaWithSchemaVer(nil, 2) + ic.Insert(is2, 2) + require.Equal(t, is2, ic.GetLatest()) + _, err = ic.GetVersionByTimestamp(0) + require.NotNil(t, err) + ver, err = ic.GetVersionByTimestamp(1) + require.Nil(t, err) + require.Equal(t, int64(1), ver) + require.Equal(t, is1, ic.GetBySnapshotTS(1)) + ver, err = ic.GetVersionByTimestamp(2) + require.Nil(t, err) + require.Equal(t, int64(2), ver) + require.Equal(t, is2, ic.GetBySnapshotTS(2)) + ver, err = ic.GetVersionByTimestamp(3) + require.Nil(t, err) + require.Equal(t, int64(2), ver) + require.Equal(t, is2, ic.GetBySnapshotTS(3)) + + is0 := infoschema.MockInfoSchemaWithSchemaVer(nil, 0) + ic.Insert(is0, 0) + require.Equal(t, is2, ic.GetLatest()) + ver, err = ic.GetVersionByTimestamp(0) + require.Nil(t, err) + require.Equal(t, int64(0), ver) + require.Equal(t, is0, ic.GetBySnapshotTS(0)) + ver, err = ic.GetVersionByTimestamp(1) + require.Nil(t, err) + require.Equal(t, int64(1), ver) + require.Equal(t, is1, ic.GetBySnapshotTS(1)) + ver, err = ic.GetVersionByTimestamp(2) + require.Nil(t, err) + require.Equal(t, int64(2), ver) + require.Equal(t, is2, ic.GetBySnapshotTS(2)) + ver, err = ic.GetVersionByTimestamp(3) + require.Nil(t, err) + require.Equal(t, int64(2), ver) + require.Equal(t, is2, ic.GetBySnapshotTS(3)) +}