Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

domain, session: Add new sysvarcache to replace global values cache #24359

Merged
merged 30 commits into from
May 17, 2021
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
24bbbdc
WIP
morgo Apr 27, 2021
53cb128
WIP
morgo Apr 27, 2021
670b2c4
Merge remote-tracking branch 'upstream/master' into sysvar-cache
morgo Apr 27, 2021
b18efb6
WIP
morgo Apr 27, 2021
e2897dd
Merge remote-tracking branch 'upstream/master' into sysvar-cache
morgo Apr 27, 2021
6b0d8c1
WIP
morgo Apr 27, 2021
6bf8f5e
Merge remote-tracking branch 'upstream/master' into sysvar-cache
morgo Apr 28, 2021
305de84
WIP
morgo Apr 28, 2021
f586e94
Add safer fallback for cache miss
morgo Apr 28, 2021
418e38a
Merge branch 'master' into sysvar-cache
morgo Apr 28, 2021
2b05f33
Merge remote-tracking branch 'upstream/master' into sysvar-cache
morgo Apr 29, 2021
3be4256
Address PR feedback
morgo Apr 29, 2021
dbab440
Merge branch 'sysvar-cache' of github.com:morgo/tidb into sysvar-cache
morgo Apr 29, 2021
5c9f3e0
Update Get to use function with fallback
morgo May 6, 2021
253eeea
Merge branch 'master' into sysvar-cache
morgo May 6, 2021
c57e1d7
Merge remote-tracking branch 'upstream/master' into sysvar-cache
morgo May 7, 2021
d88b969
Fix bad merge
morgo May 7, 2021
3d803f0
Fix bad merge
morgo May 7, 2021
76320e1
Merge branch 'sysvar-cache' of github.com:morgo/tidb into sysvar-cache
morgo May 7, 2021
144d11f
Remove space
morgo May 10, 2021
3964bcf
Merge branch 'master' into sysvar-cache
morgo May 10, 2021
32290d9
Address PR feedback
morgo May 11, 2021
59f3ff5
Merge remote-tracking branch 'upstream/master' into sysvar-cache
morgo May 11, 2021
6eb6a1c
Merge branch 'sysvar-cache' of github.com:morgo/tidb into sysvar-cache
morgo May 11, 2021
59172fe
Merge remote-tracking branch 'upstream/master' into sysvar-cache
morgo May 13, 2021
cddb747
Change automatic refresh interval to 30 seconds
morgo May 13, 2021
46d5b31
Make reading getsysvar list concurrency safe
morgo May 13, 2021
be30709
Merge remote-tracking branch 'upstream/master' into sysvar-cache
morgo May 13, 2021
b1e7a38
Merge branch 'master' into sysvar-cache
ti-chi-bot May 17, 2021
b5f8e08
Merge branch 'master' into sysvar-cache
ti-chi-bot May 17, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions cmd/explaintest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/parser/ast"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/logutil"
Expand Down Expand Up @@ -663,8 +662,6 @@ func main() {
log.Fatal(fmt.Sprintf("%s failed", sql), zap.Error(err))
}
}
// Wait global variables to reload.
time.Sleep(domain.GlobalVariableCacheExpiry)

if _, err = mdb.Exec("set sql_mode='STRICT_TRANS_TABLES'"); err != nil {
log.Fatal("set sql_mode='STRICT_TRANS_TABLES' failed", zap.Error(err))
Expand Down
2 changes: 0 additions & 2 deletions ddl/db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1063,7 +1063,6 @@ func (s *testStateChangeSuite) TestParallelAddGeneratedColumnAndAlterModifyColum
_, err = s.se.Execute(context.Background(), "set global tidb_enable_change_column_type = 0")
c.Assert(err, IsNil)
}()
domain.GetDomain(s.se).GetGlobalVarsCache().Disable()

sql1 := "ALTER TABLE t ADD COLUMN f INT GENERATED ALWAYS AS(a+1);"
sql2 := "ALTER TABLE t MODIFY COLUMN a tinyint;"
Expand All @@ -1083,7 +1082,6 @@ func (s *testStateChangeSuite) TestParallelAlterModifyColumnAndAddPK(c *C) {
_, err = s.se.Execute(context.Background(), "set global tidb_enable_change_column_type = 0")
c.Assert(err, IsNil)
}()
domain.GetDomain(s.se).GetGlobalVarsCache().Disable()

sql1 := "ALTER TABLE t ADD PRIMARY KEY (b) NONCLUSTERED;"
sql2 := "ALTER TABLE t MODIFY COLUMN b tinyint;"
Expand Down
78 changes: 76 additions & 2 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ type Domain struct {
sysSessionPool *sessionPool
exit chan struct{}
etcdClient *clientv3.Client
gvc GlobalVariableCache
sysVarCache SysVarCache // replaces GlobalVariableCache
slowQuery *topNSlowQueries
expensiveQueryHandle *expensivequery.Handle
wg sync.WaitGroup
Expand Down Expand Up @@ -922,6 +922,60 @@ func (do *Domain) LoadPrivilegeLoop(ctx sessionctx.Context) error {
return nil
}

// LoadSysVarCacheLoop create a goroutine loads sysvar cache in a loop, it
// should be called only once in BootstrapSession.
func (do *Domain) LoadSysVarCacheLoop(ctx sessionctx.Context) error {

morgo marked this conversation as resolved.
Show resolved Hide resolved
err := do.sysVarCache.RebuildSysVarCache(ctx)
if err != nil {
return err
}

var watchCh clientv3.WatchChan
duration := 5 * time.Minute
if do.etcdClient != nil {
watchCh = do.etcdClient.Watch(context.Background(), sysVarCacheKey)
duration = 10 * time.Minute
}

do.wg.Add(1)
go func() {
defer func() {
do.wg.Done()
logutil.BgLogger().Info("LoadSysVarCacheLoop exited.")
util.Recover(metrics.LabelDomain, "LoadSysVarCacheLoop", nil, false)
}()
var count int
for {
ok := true
select {
case <-do.exit:
return
case _, ok = <-watchCh:
case <-time.After(duration):
}
if !ok {
logutil.BgLogger().Error("LoadSysVarCacheLoop loop watch channel closed")
watchCh = do.etcdClient.Watch(context.Background(), sysVarCacheKey)
count++
Copy link
Contributor

@zhangjinpeng87 zhangjinpeng87 Apr 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it will do endless retrying if there is something wrong with PD?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This code is copied from the loop for privilege cache. If the server is up, it should keep retrying.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI I remember the design of endless retrying PD connectivity came up in the jepsen test. This was the recommended behavior.

if count > 10 {
time.Sleep(time.Duration(count) * time.Second)
}
continue
}

count = 0
logutil.BgLogger().Info("Rebuilding sysvar cache from etcd watch event.")
err := do.sysVarCache.RebuildSysVarCache(ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think ReBuildSysVarCache Asyncrhonizely can guarantee the read-after-write consistency. After one tidb-server set the global variable and notify the etcd, another tidb-server may still use the old value in a short time range.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should clarify: It is read-after-write on a single server only. This is still very useful because it simplifies things like the testsuite.

The reason why it offers read-after-write is that the notify function runs the rebuild task on the initiating server immediately. So the initiating server actually rebuilds the cache twice (same design as privilege cache).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the corresponding PR where the privilege cache was changed to run immediately on the initiating server: #8886

metrics.LoadSysVarCacheCounter.WithLabelValues(metrics.RetLabel(err)).Inc()
if err != nil {
logutil.BgLogger().Error("LoadSysVarCacheLoop failed", zap.Error(err))
}
}
}()
return nil
}

// PrivilegeHandle returns the MySQLPrivilege.
func (do *Domain) PrivilegeHandle() *privileges.Handle {
return do.privHandle
Expand Down Expand Up @@ -1300,7 +1354,10 @@ func (do *Domain) ExpensiveQueryHandle() *expensivequery.Handle {
return do.expensiveQueryHandle
}

const privilegeKey = "/tidb/privilege"
const (
privilegeKey = "/tidb/privilege"
sysVarCacheKey = "/tidb/sysvars"
)

// NotifyUpdatePrivilege updates privilege key in etcd, TiDB client that watches
// the key will get notification.
Expand All @@ -1322,6 +1379,23 @@ func (do *Domain) NotifyUpdatePrivilege(ctx sessionctx.Context) {
}
}

// NotifyUpdateSysVarCache updates the sysvar cache key in etcd, which other TiDB
// clients are subscribed to for updates. For the caller, the cache is also built
// synchronously so that the effect is immediate.
func (do *Domain) NotifyUpdateSysVarCache(ctx sessionctx.Context) {
if do.etcdClient != nil {
row := do.etcdClient.KV
_, err := row.Put(context.Background(), sysVarCacheKey, "")
if err != nil {
logutil.BgLogger().Warn("notify update sysvar cache failed", zap.Error(err))
}
}
// update locally
if err := do.sysVarCache.RebuildSysVarCache(ctx); err != nil {
logutil.BgLogger().Error("rebuilding sysvar cache failed", zap.Error(err))
}
}

// ServerID gets serverID.
func (do *Domain) ServerID() uint64 {
return atomic.LoadUint64(&do.serverID)
Expand Down
135 changes: 0 additions & 135 deletions domain/global_vars_cache.go

This file was deleted.

Loading