From 69e47f67b11b5e322a3c08937eb2e31693df721f Mon Sep 17 00:00:00 2001 From: Bin Shi <39923490+binshi-bing@users.noreply.github.com> Date: Wed, 24 May 2023 22:53:40 -0700 Subject: [PATCH] mcs, tso: fix logging metrics issues (#95) * metrics: add tso events metrics (#6501) close tikv/pd#6502 Signed-off-by: bufferflies <1045931706@qq.com> * keyspace: add benchmarks for keyspace assignment patrol (#6507) ref tikv/pd#5895 Add benchmarks for keyspace assignment patrol. Signed-off-by: JmPotato Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> * Print keyspace-group-id zap field in the tso log conditionally (#6514) ref tikv/pd#5895 To keep the logging info in on-premises clean, we only print keyspace-group-id zap field for the non-default keyspace group id. Signed-off-by: Bin Shi Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> * Improve logging when tso keyspace group meta is updated. (#6513) close tikv/pd#6512 Improve logging when tso keyspace group meta is updated. Signed-off-by: Bin Shi Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> * tso: log tso service discovery info on the client side only when the primary is changed (#6511) close tikv/pd#6508 Skip logging of the tso service discovery info when the secondar list is changed, because tso servers currently don't have consistent view of the member list due to remote etcd being used by tso service, which results in changing member list when the client queries the tso servers in round-robin. We need to improve the server side so that all tso servers can return the global consistent view of the keyspace groups' serving or membership info. Signed-off-by: Bin Shi Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --------- Signed-off-by: bufferflies <1045931706@qq.com> Co-authored-by: buffer <1045931706@qq.com> Co-authored-by: JmPotato Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- client/tso_service_discovery.go | 8 ++-- metrics/grafana/pd.json | 10 ++++ pkg/keyspace/keyspace.go | 16 +++++++ pkg/keyspace/keyspace_test.go | 44 +++++++++++++++++ pkg/tso/allocator_manager.go | 80 +++++++++++++++++++++++++------ pkg/tso/global_allocator.go | 73 ++++++++++++++++++++++------ pkg/tso/keyspace_group_manager.go | 13 +++-- pkg/utils/logutil/log.go | 10 ++++ 8 files changed, 215 insertions(+), 39 deletions(-) diff --git a/client/tso_service_discovery.go b/client/tso_service_discovery.go index 13d6ab24993..e3c67a92fa6 100644 --- a/client/tso_service_discovery.go +++ b/client/tso_service_discovery.go @@ -488,17 +488,15 @@ func (c *tsoServiceDiscovery) updateMember() error { } } - oldPrimary, primarySwitched, secondaryChanged := + oldPrimary, primarySwitched, _ := c.keyspaceGroupSD.update(keyspaceGroup, primaryAddr, secondaryAddrs, addrs) if primarySwitched { + log.Info("[tso] updated keyspace group service discovery info", + zap.String("keyspace-group-service", keyspaceGroup.String())) if err := c.afterPrimarySwitched(oldPrimary, primaryAddr); err != nil { return err } } - if primarySwitched || secondaryChanged { - log.Info("[tso] updated keyspace group service discovery info", - zap.String("keyspace-group-service", keyspaceGroup.String())) - } // Even if the primary address is empty, we still updated other returned info above, including the // keyspace group info and the secondary addresses. diff --git a/metrics/grafana/pd.json b/metrics/grafana/pd.json index c3c980ca848..ba1665ca21c 100644 --- a/metrics/grafana/pd.json +++ b/metrics/grafana/pd.json @@ -10035,6 +10035,16 @@ "legendFormat": "tso request/secs", "refId": "A", "step": 2 + }, + { + "expr": "sum(rate(pd_tso_events{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}[1m])) by (type,dc)", + "format": "time_series", + "interval": "", + "intervalFactor": 2, + "legendFormat": "{{type}}-{{dc}}", + "hide": true, + "refId": "B", + "step": 2 } ], "thresholds": [], diff --git a/pkg/keyspace/keyspace.go b/pkg/keyspace/keyspace.go index c81c1cbf86c..aa8a68b6ef8 100644 --- a/pkg/keyspace/keyspace.go +++ b/pkg/keyspace/keyspace.go @@ -647,6 +647,10 @@ func (manager *Manager) allocID() (uint32, error) { // PatrolKeyspaceAssignment is used to patrol all keyspaces and assign them to the keyspace groups. func (manager *Manager) PatrolKeyspaceAssignment() error { var ( + // Some statistics info. + start = time.Now() + patrolledKeyspaceCount uint64 + assignedKeyspaceCount uint64 // The current start ID of the patrol, used for logging. currentStartID = manager.nextPatrolStartID // The next start ID of the patrol, used for the next patrol. @@ -654,6 +658,16 @@ func (manager *Manager) PatrolKeyspaceAssignment() error { moreToPatrol = true err error ) + defer func() { + log.Debug("[keyspace] patrol keyspace assignment finished", + zap.Duration("cost", time.Since(start)), + zap.Uint64("patrolled-keyspace-count", patrolledKeyspaceCount), + zap.Uint64("assigned-keyspace-count", assignedKeyspaceCount), + zap.Int("batch-size", keyspacePatrolBatchSize), + zap.Uint32("current-start-id", currentStartID), + zap.Uint32("next-start-id", nextStartID), + ) + }() for moreToPatrol { err = manager.store.RunInTxn(manager.ctx, func(txn kv.Txn) error { defaultKeyspaceGroup, err := manager.kgm.store.LoadKeyspaceGroup(txn, utils.DefaultKeyspaceGroupID) @@ -692,6 +706,7 @@ func (manager *Manager) PatrolKeyspaceAssignment() error { if ks == nil { continue } + patrolledKeyspaceCount++ manager.metaLock.Lock(ks.Id) if ks.Config == nil { ks.Config = make(map[string]string, 1) @@ -718,6 +733,7 @@ func (manager *Manager) PatrolKeyspaceAssignment() error { zap.Uint32("keyspace-id", ks.Id), zap.Error(err)) return err } + assignedKeyspaceCount++ } if assigned { err = manager.kgm.store.SaveKeyspaceGroup(txn, defaultKeyspaceGroup) diff --git a/pkg/keyspace/keyspace_test.go b/pkg/keyspace/keyspace_test.go index 19e7d97c9d9..948fe434088 100644 --- a/pkg/keyspace/keyspace_test.go +++ b/pkg/keyspace/keyspace_test.go @@ -434,3 +434,47 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentInBatch() { re.Contains(defaultKeyspaceGroup.Keyspaces, uint32(i)) } } + +// Benchmark the keyspace assignment patrol. +func BenchmarkPatrolKeyspaceAssignment1000(b *testing.B) { + benchmarkPatrolKeyspaceAssignmentN(1000, b) +} + +func BenchmarkPatrolKeyspaceAssignment10000(b *testing.B) { + benchmarkPatrolKeyspaceAssignmentN(10000, b) +} + +func BenchmarkPatrolKeyspaceAssignment100000(b *testing.B) { + benchmarkPatrolKeyspaceAssignmentN(100000, b) +} + +func benchmarkPatrolKeyspaceAssignmentN( + n int, b *testing.B, +) { + suite := new(keyspaceTestSuite) + suite.SetT(&testing.T{}) + suite.SetupSuite() + suite.SetupTest() + re := suite.Require() + // Create some keyspaces without any keyspace group. + for i := 1; i <= n; i++ { + now := time.Now().Unix() + err := suite.manager.saveNewKeyspace(&keyspacepb.KeyspaceMeta{ + Id: uint32(i), + Name: strconv.Itoa(i), + State: keyspacepb.KeyspaceState_ENABLED, + CreatedAt: now, + StateChangedAt: now, + }) + re.NoError(err) + } + // Benchmark the keyspace assignment patrol. + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := suite.manager.PatrolKeyspaceAssignment() + re.NoError(err) + } + b.StopTimer() + suite.TearDownTest() + suite.TearDownSuite() +} diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index 35c671b2236..ed881b1c7df 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -272,6 +272,14 @@ func (am *AllocatorManager) setUpLocalAllocator(parentCtx context.Context, dcLoc go am.allocatorLeaderLoop(parentCtx, localTSOAllocator) } +// getGroupID returns the keyspace group ID of the allocator manager. +func (am *AllocatorManager) getGroupID() uint32 { + if am == nil { + return 0 + } + return am.kgID +} + // GetTimestampPath returns the timestamp path in etcd for the given DCLocation. func (am *AllocatorManager) GetTimestampPath(dcLocation string) string { if am == nil { @@ -295,13 +303,13 @@ func (am *AllocatorManager) tsoAllocatorLoop() { defer am.svcLoopWG.Done() am.AllocatorDaemon(am.ctx) - log.Info("exit allocator loop") + log.Info("exit allocator loop", logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0)) } // close is used to shutdown TSO Allocator updating daemon. // tso service call this function to shutdown the loop here, but pd manages its own loop. func (am *AllocatorManager) close() { - log.Info("closing the allocator manager") + log.Info("closing the allocator manager", logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0)) if allocatorGroup, exist := am.getAllocatorGroup(GlobalDCLocation); exist { allocatorGroup.allocator.(*GlobalTSOAllocator).close() @@ -310,7 +318,7 @@ func (am *AllocatorManager) close() { am.cancel() am.svcLoopWG.Wait() - log.Info("closed the allocator manager") + log.Info("closed the allocator manager", logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0)) } // GetMember returns the ElectionMember of this AllocatorManager. @@ -325,6 +333,7 @@ func (am *AllocatorManager) SetLocalTSOConfig(dcLocation string) error { serverID := am.member.ID() if err := am.checkDCLocationUpperLimit(dcLocation); err != nil { log.Error("check dc-location upper limit failed", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID != 0), zap.Int("upper-limit", int(math.Pow(2, MaxSuffixBits))-1), zap.String("dc-location", dcLocation), zap.String("server-name", serverName), @@ -343,12 +352,14 @@ func (am *AllocatorManager) SetLocalTSOConfig(dcLocation string) error { } if !resp.Succeeded { log.Warn("write dc-location configuration into etcd failed", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.String("dc-location", dcLocation), zap.String("server-name", serverName), zap.Uint64("server-id", serverID)) return errs.ErrEtcdTxnConflict.FastGenByArgs() } log.Info("write dc-location configuration into etcd", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.String("dc-location", dcLocation), zap.String("server-name", serverName), zap.Uint64("server-id", serverID)) @@ -391,6 +402,7 @@ func (am *AllocatorManager) GetClusterDCLocationsFromEtcd() (clusterDCLocations dcLocation := string(kv.Value) if err != nil { log.Warn("get server id and dcLocation from etcd failed, invalid server id", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.Any("splitted-serverPath", serverPath), zap.String("dc-location", dcLocation), errs.ZapError(err)) @@ -425,7 +437,9 @@ func (am *AllocatorManager) CleanUpDCLocation() error { } else if !resp.Succeeded { return errs.ErrEtcdTxnConflict.FastGenByArgs() } - log.Info("delete the dc-location key previously written in etcd", zap.Uint64("server-id", serverID)) + log.Info("delete the dc-location key previously written in etcd", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), + zap.Uint64("server-id", serverID)) go am.ClusterDCLocationChecker() return nil } @@ -491,6 +505,7 @@ func (am *AllocatorManager) getLocalTSOAllocatorPath() string { func (am *AllocatorManager) allocatorLeaderLoop(ctx context.Context, allocator *LocalTSOAllocator) { defer logutil.LogPanic() defer log.Info("server is closed, return local tso allocator leader loop", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.String("dc-location", allocator.GetDCLocation()), zap.String("local-tso-allocator-name", am.member.Name())) for { @@ -507,11 +522,13 @@ func (am *AllocatorManager) allocatorLeaderLoop(ctx context.Context, allocator * } if allocatorLeader != nil { log.Info("start to watch allocator leader", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.Stringer(fmt.Sprintf("%s-allocator-leader", allocator.GetDCLocation()), allocatorLeader), zap.String("local-tso-allocator-name", am.member.Name())) // WatchAllocatorLeader will keep looping and never return unless the Local TSO Allocator leader has changed. allocator.WatchAllocatorLeader(ctx, allocatorLeader, rev) log.Info("local tso allocator leader has changed, try to re-campaign a local tso allocator leader", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.String("dc-location", allocator.GetDCLocation())) } @@ -519,6 +536,7 @@ func (am *AllocatorManager) allocatorLeaderLoop(ctx context.Context, allocator * nextLeader, err := am.getNextLeaderID(allocator.GetDCLocation()) if err != nil { log.Error("get next leader from etcd failed", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.String("dc-location", allocator.GetDCLocation()), errs.ZapError(err)) time.Sleep(200 * time.Millisecond) @@ -528,6 +546,7 @@ func (am *AllocatorManager) allocatorLeaderLoop(ctx context.Context, allocator * if nextLeader != 0 { if nextLeader != am.member.ID() { log.Info("skip campaigning of the local tso allocator leader and check later", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.String("server-name", am.member.Name()), zap.Uint64("server-id", am.member.ID()), zap.Uint64("next-leader-id", nextLeader)) @@ -542,6 +561,7 @@ func (am *AllocatorManager) allocatorLeaderLoop(ctx context.Context, allocator * ok, dcLocationInfo, err := am.getDCLocationInfoFromLeader(ctx, allocator.GetDCLocation()) if err != nil { log.Error("get dc-location info from pd leader failed", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.String("dc-location", allocator.GetDCLocation()), errs.ZapError(err)) // PD leader hasn't been elected out, wait for the campaign @@ -552,6 +572,7 @@ func (am *AllocatorManager) allocatorLeaderLoop(ctx context.Context, allocator * } if !ok || dcLocationInfo.Suffix <= 0 || dcLocationInfo.MaxTs == nil { log.Warn("pd leader is not aware of dc-location during allocatorLeaderLoop, wait next round", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.String("dc-location", allocator.GetDCLocation()), zap.Any("dc-location-info", dcLocationInfo), zap.String("wait-duration", checkStep.String())) @@ -588,6 +609,7 @@ func (am *AllocatorManager) campaignAllocatorLeader( isNextLeader bool, ) { log.Info("start to campaign local tso allocator leader", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.String("dc-location", allocator.GetDCLocation()), zap.Any("dc-location-info", dcLocationInfo), zap.String("name", am.member.Name())) @@ -611,11 +633,13 @@ func (am *AllocatorManager) campaignAllocatorLeader( if err := allocator.CampaignAllocatorLeader(am.leaderLease, cmps...); err != nil { if err.Error() == errs.ErrEtcdTxnConflict.Error() { log.Info("failed to campaign local tso allocator leader due to txn conflict, another allocator may campaign successfully", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.String("dc-location", allocator.GetDCLocation()), zap.Any("dc-location-info", dcLocationInfo), zap.String("name", am.member.Name())) } else { log.Error("failed to campaign local tso allocator leader due to etcd error", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.String("dc-location", allocator.GetDCLocation()), zap.Any("dc-location-info", dcLocationInfo), zap.String("name", am.member.Name()), @@ -631,16 +655,19 @@ func (am *AllocatorManager) campaignAllocatorLeader( // Maintain the Local TSO Allocator leader go allocator.KeepAllocatorLeader(ctx) log.Info("campaign local tso allocator leader ok", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.String("dc-location", allocator.GetDCLocation()), zap.Any("dc-location-info", dcLocationInfo), zap.String("name", am.member.Name())) log.Info("initialize the local TSO allocator", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.String("dc-location", allocator.GetDCLocation()), zap.Any("dc-location-info", dcLocationInfo), zap.String("name", am.member.Name())) if err := allocator.Initialize(int(dcLocationInfo.Suffix)); err != nil { log.Error("failed to initialize the local TSO allocator", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.String("dc-location", allocator.GetDCLocation()), zap.Any("dc-location-info", dcLocationInfo), errs.ZapError(err)) @@ -649,6 +676,7 @@ func (am *AllocatorManager) campaignAllocatorLeader( if dcLocationInfo.GetMaxTs().GetPhysical() != 0 { if err := allocator.WriteTSO(dcLocationInfo.GetMaxTs()); err != nil { log.Error("failed to write the max local TSO after member changed", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.String("dc-location", allocator.GetDCLocation()), zap.Any("dc-location-info", dcLocationInfo), errs.ZapError(err)) @@ -660,6 +688,7 @@ func (am *AllocatorManager) campaignAllocatorLeader( // The next leader is me, delete it to finish campaigning am.deleteNextLeaderID(allocator.GetDCLocation()) log.Info("local tso allocator leader is ready to serve", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.String("dc-location", allocator.GetDCLocation()), zap.Any("dc-location-info", dcLocationInfo), zap.String("name", am.member.Name())) @@ -672,6 +701,7 @@ func (am *AllocatorManager) campaignAllocatorLeader( case <-leaderTicker.C: if !allocator.IsAllocatorLeader() { log.Info("no longer a local tso allocator leader because lease has expired, local tso allocator leader will step down", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.String("dc-location", allocator.GetDCLocation()), zap.Any("dc-location-info", dcLocationInfo), zap.String("name", am.member.Name())) @@ -680,6 +710,7 @@ func (am *AllocatorManager) campaignAllocatorLeader( case <-ctx.Done(): // Server is closed and it should return nil. log.Info("server is closed, reset the local tso allocator", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.String("dc-location", allocator.GetDCLocation()), zap.Any("dc-location-info", dcLocationInfo), zap.String("name", am.member.Name())) @@ -691,7 +722,7 @@ func (am *AllocatorManager) campaignAllocatorLeader( // AllocatorDaemon is used to update every allocator's TSO and check whether we have // any new local allocator that needs to be set up. func (am *AllocatorManager) AllocatorDaemon(ctx context.Context) { - log.Info("entering into allocator daemon") + log.Info("entering into allocator daemon", logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0)) // allocatorPatroller should only work when enableLocalTSO is true to // set up the new Local TSO Allocator in time. @@ -728,7 +759,7 @@ func (am *AllocatorManager) AllocatorDaemon(ctx context.Context) { // PS: ClusterDCLocationChecker and PriorityChecker are time consuming and low frequent to run, // we should run them concurrently to speed up the progress. case <-ctx.Done(): - log.Info("exit allocator daemon") + log.Info("exit allocator daemon", logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0)) return } } @@ -755,17 +786,20 @@ func (am *AllocatorManager) updateAllocator(ag *allocatorGroup) { case <-ag.ctx.Done(): // Resetting the allocator will clear TSO in memory ag.allocator.Reset() - log.Info("exit the allocator update loop") + log.Info("exit the allocator update loop", logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0)) return default: } if !ag.leadership.Check() { - log.Info("allocator doesn't campaign leadership yet", zap.String("dc-location", ag.dcLocation)) + log.Info("allocator doesn't campaign leadership yet", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), + zap.String("dc-location", ag.dcLocation)) time.Sleep(200 * time.Millisecond) return } if err := ag.allocator.UpdateTSO(); err != nil { log.Warn("failed to update allocator's timestamp", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.String("dc-location", ag.dcLocation), zap.String("name", am.member.Name()), errs.ZapError(err)) @@ -811,7 +845,9 @@ func (am *AllocatorManager) ClusterDCLocationChecker() { } newClusterDCLocations, err := am.GetClusterDCLocationsFromEtcd() if err != nil { - log.Error("get cluster dc-locations from etcd failed", errs.ZapError(err)) + log.Error("get cluster dc-locations from etcd failed", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), + errs.ZapError(err)) return } am.mu.Lock() @@ -842,7 +878,9 @@ func (am *AllocatorManager) ClusterDCLocationChecker() { suffix, err := am.getOrCreateLocalTSOSuffix(dcLocation) if err != nil { log.Warn("get or create the local tso suffix failed", - zap.String("dc-location", dcLocation), errs.ZapError(err)) + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), + zap.String("dc-location", dcLocation), + errs.ZapError(err)) continue } if suffix > am.mu.maxSuffix { @@ -854,7 +892,9 @@ func (am *AllocatorManager) ClusterDCLocationChecker() { // Follower should check and update the am.mu.maxSuffix maxSuffix, err := am.getMaxLocalTSOSuffix() if err != nil { - log.Error("get the max local tso suffix from etcd failed", errs.ZapError(err)) + log.Error("get the max local tso suffix from etcd failed", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), + errs.ZapError(err)) // Rollback the new dc-locations we update before for _, dcLocation := range newDCLocations { delete(am.mu.clusterDCLocations, dcLocation) @@ -899,6 +939,7 @@ func (am *AllocatorManager) getOrCreateLocalTSOSuffix(dcLocation string) (int32, } if !txnResp.Succeeded { log.Warn("write local tso suffix into etcd failed", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.String("dc-location", dcLocation), zap.String("local-tso-suffix", localTSOSuffixValue), zap.String("server-name", am.member.Name()), @@ -981,12 +1022,14 @@ func (am *AllocatorManager) PriorityChecker() { // find this allocator's dc-location isn't the same with server of dc-2 but is same with itself. if allocatorGroup.dcLocation != leaderServerDCLocation && allocatorGroup.dcLocation == myServerDCLocation { log.Info("try to move the local tso allocator", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.Uint64("old-leader-id", leaderServerID), zap.String("old-dc-location", leaderServerDCLocation), zap.Uint64("next-leader-id", serverID), zap.String("next-dc-location", myServerDCLocation)) if err := am.transferLocalAllocator(allocatorGroup.dcLocation, am.member.ID()); err != nil { log.Error("move the local tso allocator failed", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.Uint64("old-leader-id", leaderServerID), zap.String("old-dc-location", leaderServerDCLocation), zap.Uint64("next-leader-id", serverID), @@ -1003,12 +1046,16 @@ func (am *AllocatorManager) PriorityChecker() { nextLeader, err := am.getNextLeaderID(allocatorGroup.dcLocation) if err != nil { log.Error("get next leader from etcd failed", - zap.String("dc-location", allocatorGroup.dcLocation), errs.ZapError(err)) + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), + zap.String("dc-location", allocatorGroup.dcLocation), + errs.ZapError(err)) continue } // nextLeader is not empty and isn't same with the server ID, resign the leader if nextLeader != 0 && nextLeader != serverID { - log.Info("next leader key found, resign current leader", zap.Uint64("nextLeaderID", nextLeader)) + log.Info("next leader key found, resign current leader", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), + zap.Uint64("nextLeaderID", nextLeader)) am.ResetAllocatorGroup(allocatorGroup.dcLocation) } } @@ -1284,6 +1331,7 @@ func (am *AllocatorManager) setGRPCConn(newConn *grpc.ClientConn, addr string) { if _, ok := am.localAllocatorConn.clientConns[addr]; ok { newConn.Close() log.Debug("use old connection", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.String("target", newConn.Target()), zap.String("state", newConn.GetState().String())) return @@ -1301,6 +1349,7 @@ func (am *AllocatorManager) transferLocalAllocator(dcLocation string, serverID u if err != nil { err = errs.ErrEtcdGrantLease.Wrap(err).GenWithStackByCause() log.Error("failed to grant the lease of the next leader key", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.String("dc-location", dcLocation), zap.Uint64("serverID", serverID), errs.ZapError(err)) @@ -1313,12 +1362,15 @@ func (am *AllocatorManager) transferLocalAllocator(dcLocation string, serverID u if err != nil { err = errs.ErrEtcdTxnInternal.Wrap(err).GenWithStackByCause() log.Error("failed to write next leader key into etcd", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.String("dc-location", dcLocation), zap.Uint64("serverID", serverID), errs.ZapError(err)) return err } if !resp.Succeeded { - log.Warn("write next leader id into etcd unsuccessfully", zap.String("dc-location", dcLocation)) + log.Warn("write next leader id into etcd unsuccessfully", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), + zap.String("dc-location", dcLocation)) return errs.ErrEtcdTxnConflict.GenWithStack("write next leader id into etcd unsuccessfully") } return nil diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index 1d961fd1b95..284d7dc316a 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -133,6 +133,14 @@ func (gta *GlobalTSOAllocator) close() { gta.wg.Wait() } +// getGroupID returns the keyspace group ID of the allocator. +func (gta *GlobalTSOAllocator) getGroupID() uint32 { + if gta.am == nil { + return 0 + } + return gta.am.getGroupID() +} + func (gta *GlobalTSOAllocator) setSyncRTT(rtt int64) { gta.syncRTT.Store(rtt) tsoGauge.WithLabelValues("global_tso_sync_rtt", gta.timestampOracle.dcLocation).Set(float64(rtt)) @@ -234,7 +242,9 @@ func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (pdpb.Timestamp, error) // 1. Estimate a MaxTS among all Local TSO Allocator leaders according to the RTT. estimatedMaxTSO, shouldRetry, err = gta.estimateMaxTS(count, suffixBits) if err != nil { - log.Error("global tso allocator estimates MaxTS failed", errs.ZapError(err)) + log.Error("global tso allocator estimates MaxTS failed", + logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), + errs.ZapError(err)) continue } if shouldRetry { @@ -247,7 +257,9 @@ func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (pdpb.Timestamp, error) // we need to validate it first before we write it into every Local TSO Allocator's memory. globalTSOResp = *estimatedMaxTSO if err = gta.SyncMaxTS(ctx, dcLocationMap, &globalTSOResp, skipCheck); err != nil { - log.Error("global tso allocator synchronizes MaxTS failed", errs.ZapError(err)) + log.Error("global tso allocator synchronizes MaxTS failed", + logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), + errs.ZapError(err)) continue } // 3. If skipCheck is false and the maxTSO is bigger than estimatedMaxTSO, @@ -271,7 +283,9 @@ func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (pdpb.Timestamp, error) // 4. Persist MaxTS into memory, and etcd if needed var currentGlobalTSO *pdpb.Timestamp if currentGlobalTSO, err = gta.getCurrentTSO(); err != nil { - log.Error("global tso allocator gets the current global tso in memory failed", errs.ZapError(err)) + log.Error("global tso allocator gets the current global tso in memory failed", + logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), + errs.ZapError(err)) continue } if tsoutil.CompareTimestamp(currentGlobalTSO, &globalTSOResp) < 0 { @@ -279,7 +293,9 @@ func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (pdpb.Timestamp, error) // Update the Global TSO in memory if err = gta.timestampOracle.resetUserTimestamp(gta.member.GetLeadership(), tsoutil.GenerateTS(&globalTSOResp), true); err != nil { tsoCounter.WithLabelValues("global_tso_persist_err", gta.timestampOracle.dcLocation).Inc() - log.Error("global tso allocator update the global tso in memory failed", errs.ZapError(err)) + log.Error("global tso allocator update the global tso in memory failed", + logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), + errs.ZapError(err)) continue } } @@ -314,6 +330,7 @@ func (gta *GlobalTSOAllocator) precheckLogical(maxTSO *pdpb.Timestamp, suffixBit // Check if the logical part will reach the overflow condition after being differentiated. if caliLogical := gta.timestampOracle.calibrateLogical(maxTSO.Logical, suffixBits); caliLogical >= maxLogical { log.Error("estimated logical part outside of max logical interval, please check ntp time", + logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), zap.Reflect("max-tso", maxTSO), errs.ZapError(errs.ErrLogicOverflow)) tsoCounter.WithLabelValues("precheck_logical_overflow", gta.timestampOracle.dcLocation).Inc() return false @@ -400,12 +417,14 @@ func (gta *GlobalTSOAllocator) SyncMaxTS( respCh <- syncMaxTSResp if syncMaxTSResp.err != nil { log.Error("sync max ts rpc failed, got an error", + logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), zap.String("local-allocator-leader-url", leaderConn.Target()), errs.ZapError(err)) return } if syncMaxTSResp.rpcRes.GetHeader().GetError() != nil { log.Error("sync max ts rpc failed, got an error", + logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), zap.String("local-allocator-leader-url", leaderConn.Target()), errs.ZapError(errors.New(syncMaxTSResp.rpcRes.GetHeader().GetError().String()))) return @@ -456,7 +475,10 @@ func (gta *GlobalTSOAllocator) SyncMaxTS( // Check whether all dc-locations have been considered during the synchronization and retry once if any dc-location missed. if ok, unsyncedDCs := gta.checkSyncedDCs(dcLocationMap, syncedDCs); !ok { log.Info("unsynced dc-locations found, will retry", - zap.Bool("skip-check", skipCheck), zap.Strings("synced-DCs", syncedDCs), zap.Strings("unsynced-DCs", unsyncedDCs)) + logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), + zap.Bool("skip-check", skipCheck), + zap.Strings("synced-DCs", syncedDCs), + zap.Strings("unsynced-DCs", unsyncedDCs)) if i < syncMaxRetryCount-1 { // maxTSO should remain the same. *maxTSO = originalMaxTSO @@ -483,7 +505,10 @@ func (gta *GlobalTSOAllocator) checkSyncedDCs(dcLocationMap map[string]DCLocatio unsyncedDCs = append(unsyncedDCs, dcLocation) } } - log.Debug("check unsynced dc-locations", zap.Strings("unsynced-DCs", unsyncedDCs), zap.Strings("synced-DCs", syncedDCs)) + log.Debug("check unsynced dc-locations", + logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), + zap.Strings("unsynced-DCs", unsyncedDCs), + zap.Strings("synced-DCs", syncedDCs)) return len(unsyncedDCs) == 0, unsyncedDCs } @@ -508,7 +533,8 @@ func (gta *GlobalTSOAllocator) primaryElectionLoop() { for { select { case <-gta.ctx.Done(): - log.Info("exit the global tso primary election loop") + log.Info("exit the global tso primary election loop", + logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0)) return default: } @@ -519,11 +545,13 @@ func (gta *GlobalTSOAllocator) primaryElectionLoop() { } if primary != nil { log.Info("start to watch the primary", + logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), zap.String("campaign-tso-primary-name", gta.member.Name()), zap.Stringer("tso-primary", primary)) // Watch will keep looping and never return unless the primary has changed. primary.Watch(gta.ctx) - log.Info("the tso primary has changed, try to re-campaign a primary") + log.Info("the tso primary has changed, try to re-campaign a primary", + logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0)) } gta.campaignLeader() @@ -531,16 +559,21 @@ func (gta *GlobalTSOAllocator) primaryElectionLoop() { } func (gta *GlobalTSOAllocator) campaignLeader() { - log.Info("start to campaign the primary", zap.String("campaign-tso-primary-name", gta.member.Name())) + log.Info("start to campaign the primary", + logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), + zap.String("campaign-tso-primary-name", gta.member.Name())) if err := gta.am.member.CampaignLeader(gta.am.leaderLease); err != nil { if errors.Is(err, errs.ErrEtcdTxnConflict) { log.Info("campaign tso primary meets error due to txn conflict, another tso server may campaign successfully", + logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), zap.String("campaign-tso-primary-name", gta.member.Name())) } else if errors.Is(err, errs.ErrCheckCampaign) { log.Info("campaign tso primary meets error due to pre-check campaign failed, the tso keyspace group may be in split", + logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), zap.String("campaign-tso-primary-name", gta.member.Name())) } else { log.Error("campaign tso primary meets error due to etcd error", + logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), zap.String("campaign-tso-primary-name", gta.member.Name()), errs.ZapError(err)) } return @@ -559,16 +592,22 @@ func (gta *GlobalTSOAllocator) campaignLeader() { // maintain the the leadership, after this, TSO can be service. gta.member.KeepLeader(ctx) - log.Info("campaign tso primary ok", zap.String("campaign-tso-primary-name", gta.member.Name())) + log.Info("campaign tso primary ok", + logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), + zap.String("campaign-tso-primary-name", gta.member.Name())) allocator, err := gta.am.GetAllocator(GlobalDCLocation) if err != nil { - log.Error("failed to get the global tso allocator", errs.ZapError(err)) + log.Error("failed to get the global tso allocator", + logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), + errs.ZapError(err)) return } log.Info("initializing the global tso allocator") if err := allocator.Initialize(0); err != nil { - log.Error("failed to initialize the global tso allocator", errs.ZapError(err)) + log.Error("failed to initialize the global tso allocator", + logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), + errs.ZapError(err)) return } defer func() { @@ -583,7 +622,9 @@ func (gta *GlobalTSOAllocator) campaignLeader() { // TODO: if enable-local-tso is true, check the cluster dc-location after the primary is elected // go gta.tsoAllocatorManager.ClusterDCLocationChecker() - log.Info("tso primary is ready to serve", zap.String("tso-primary-name", gta.member.Name())) + log.Info("tso primary is ready to serve", + logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), + zap.String("tso-primary-name", gta.member.Name())) leaderTicker := time.NewTicker(mcsutils.LeaderTickInterval) defer leaderTicker.Stop() @@ -592,12 +633,14 @@ func (gta *GlobalTSOAllocator) campaignLeader() { select { case <-leaderTicker.C: if !gta.member.IsLeader() { - log.Info("no longer a primary because lease has expired, the tso primary will step down") + log.Info("no longer a primary because lease has expired, the tso primary will step down", + logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0)) return } case <-ctx.Done(): // Server is closed and it should return nil. - log.Info("exit leader campaign") + log.Info("exit leader campaign", + logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0)) return } } diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 4d32ae92c80..6859ae82386 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -360,8 +360,6 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro // If the default keyspace group isn't assigned to any tso node/pod, assign it to everyone. if group.ID == mcsutils.DefaultKeyspaceGroupID && len(group.Members) == 0 { - log.Warn("configured the default keyspace group but no members/distribution specified. " + - "ignore it for now and fallback to the way of every tso node/pod owning a replica") // TODO: fill members with all tso nodes/pods. group.Members = []endpoint.KeyspaceGroupMember{{Address: kgm.tsoServiceID.ServiceAddr}} } @@ -373,10 +371,9 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro return } - // If this host is already assigned a replica of this keyspace group, that is to is already initialized, just update the meta. + // If this host is already assigned a replica of this keyspace group, i.e., the election member + // is already initialized, just update the meta. if oldAM, oldGroup := kgm.getKeyspaceGroupMeta(group.ID); oldAM != nil { - log.Info("keyspace group already initialized, so update meta only", - zap.Uint32("keyspace-group-id", group.ID)) kgm.updateKeyspaceGroupMembership(oldGroup, group, true) return } @@ -513,6 +510,12 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroupMembership( // The keyspace group membership is not changed. Reuse the old one. newGroup.KeyspaceLookupTable = oldKeyspaceLookupTable } else { + // The keyspace list might be too long, so we only log the length, though there is a rare case that + // the old length and the new length are the same but the keyspace list is changed. + log.Info("the keyspace group's keyspace list is changed", + zap.Uint32("keyspace-group-id", groupID), + zap.Int("old-keyspaces-count", oldLen), + zap.Int("new-keyspaces-count", newLen)) // The keyspace group membership is changed. Update the keyspace lookup table. newGroup.KeyspaceLookupTable = make(map[uint32]struct{}) for i, j := 0, 0; i < oldLen || j < newLen; { diff --git a/pkg/utils/logutil/log.go b/pkg/utils/logutil/log.go index a46f47da891..abb6a2783a0 100644 --- a/pkg/utils/logutil/log.go +++ b/pkg/utils/logutil/log.go @@ -152,3 +152,13 @@ type stringer struct { func (s stringer) String() string { return "?" } + +// CondUint32 constructs a field with the given key and value conditionally. +// If the condition is true, it constructs a field with uint32 type; otherwise, +// skip the field. +func CondUint32(key string, val uint32, condition bool) zap.Field { + if condition { + return zap.Uint32(key, val) + } + return zap.Skip() +}