Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lease: keep alive Leadership immediately after finish the election of leader #7707

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion pkg/election/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (ls *Leadership) addCampaignTimes() {
}

// Campaign is used to campaign the leader with given lease and returns a leadership
func (ls *Leadership) Campaign(leaseTimeout int64, leaderData string, cmps ...clientv3.Cmp) error {
func (ls *Leadership) Campaign(ctx context.Context, leaseTimeout int64, leaderData string, cmps ...clientv3.Cmp) error {
ls.addCampaignTimes()
ls.leaderValue = leaderData
// Create a new lease to campaign
Expand All @@ -170,6 +170,9 @@ func (ls *Leadership) Campaign(leaseTimeout int64, leaderData string, cmps ...cl
if err := newLease.Grant(leaseTimeout); err != nil {
return err
}
// Start keepalive to maintain the leadership.
ls.Keep(ctx)

finalCmps := make([]clientv3.Cmp, 0, len(cmps)+1)
finalCmps = append(finalCmps, cmps...)
// The leader key must not exist, so the CreateRevision is 0.
Expand Down
36 changes: 16 additions & 20 deletions pkg/election/leadership_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,32 +40,32 @@ func TestLeadership(t *testing.T) {
leadership1 := NewLeadership(client, "/test_leader", "test_leader_1")
leadership2 := NewLeadership(client, "/test_leader", "test_leader_2")

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// leadership1 starts first and get the leadership
err := leadership1.Campaign(defaultLeaseTimeout, "test_leader_1")
err := leadership1.Campaign(ctx, defaultLeaseTimeout, "test_leader_1")
re.NoError(err)
// leadership2 starts then and can not get the leadership
err = leadership2.Campaign(defaultLeaseTimeout, "test_leader_2")
err = leadership2.Campaign(ctx, defaultLeaseTimeout, "test_leader_2")
re.Error(err)

re.True(leadership1.Check())
// leadership2 failed, so the check should return false
re.False(leadership2.Check())

// Sleep longer than the defaultLeaseTimeout to wait for the lease expires
// Sleep longer than the defaultLeaseTimeout
// and keep alive will keep the leadership
time.Sleep((defaultLeaseTimeout + 1) * time.Second)

re.False(leadership1.Check())
re.True(leadership1.Check())
re.False(leadership2.Check())

// Delete the leader key and campaign for leadership1
err = leadership1.DeleteLeaderKey()
re.NoError(err)
err = leadership1.Campaign(defaultLeaseTimeout, "test_leader_1")
err = leadership1.Campaign(ctx, defaultLeaseTimeout, "test_leader_1")
re.NoError(err)
re.True(leadership1.Check())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go leadership1.Keep(ctx)

// Sleep longer than the defaultLeaseTimeout
time.Sleep((defaultLeaseTimeout + 1) * time.Second)
Expand All @@ -76,12 +76,9 @@ func TestLeadership(t *testing.T) {
// Delete the leader key and re-campaign for leadership2
err = leadership1.DeleteLeaderKey()
re.NoError(err)
err = leadership2.Campaign(defaultLeaseTimeout, "test_leader_2")
err = leadership2.Campaign(ctx, defaultLeaseTimeout, "test_leader_2")
re.NoError(err)
re.True(leadership2.Check())
ctx, cancel = context.WithCancel(context.Background())
defer cancel()
go leadership2.Keep(ctx)

// Sleep longer than the defaultLeaseTimeout
time.Sleep((defaultLeaseTimeout + 1) * time.Second)
Expand All @@ -95,10 +92,6 @@ func TestLeadership(t *testing.T) {
re.False(leadership1.Check())
re.False(leadership2.Check())

// Try to keep the reset leadership.
leadership1.Keep(ctx)
leadership2.Keep(ctx)

// Check the lease.
lease1 := leadership1.getLease()
re.NotNil(lease1)
Expand Down Expand Up @@ -193,9 +186,12 @@ func checkExitWatch(t *testing.T, leaderKey string, injectFunc func(server *embe
re.NoError(err)
defer client2.Close()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

leadership1 := NewLeadership(client1, leaderKey, "test_leader_1")
leadership2 := NewLeadership(client2, leaderKey, "test_leader_2")
err = leadership1.Campaign(defaultLeaseTimeout, "test_leader_1")
err = leadership1.Campaign(ctx, defaultLeaseTimeout, "test_leader_1")
re.NoError(err)
resp, err := client2.Get(context.Background(), leaderKey)
re.NoError(err)
Expand Down Expand Up @@ -229,14 +225,14 @@ func TestRequestProgress(t *testing.T) {
re.NoError(err)
defer client2.Close()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
leaderKey := "/test_leader"
leadership1 := NewLeadership(client1, leaderKey, "test_leader_1")
leadership2 := NewLeadership(client2, leaderKey, "test_leader_2")
err = leadership1.Campaign(defaultLeaseTimeout, "test_leader_1")
err = leadership1.Campaign(ctx, defaultLeaseTimeout, "test_leader_1")
re.NoError(err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
resp, err := client2.Get(ctx, leaderKey)
re.NoError(err)
go func() {
Expand Down
42 changes: 26 additions & 16 deletions pkg/encryption/key_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ func newTestKeyFile(t *testing.T, re *require.Assertions, key ...string) (keyFil
return keyFilePath
}

func newTestLeader(re *require.Assertions, client *clientv3.Client) *election.Leadership {
func newTestLeader(ctx context.Context, re *require.Assertions, client *clientv3.Client) *election.Leadership {
leader := election.NewLeadership(client, "test_leader", "test")
timeout := int64(30000000) // about a year.
err := leader.Campaign(timeout, "")
err := leader.Campaign(ctx, timeout, "")
re.NoError(err)
return leader
}
Expand Down Expand Up @@ -148,7 +148,9 @@ func TestNewKeyManagerLoadKeys(t *testing.T) {
// Initialize.
client := newTestEtcd(t)
keyFile := newTestKeyFile(t, re)
leadership := newTestLeader(re, client)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
leadership := newTestLeader(ctx, re, client)
// Use default config.
config := &Config{}
err := config.Adjust()
Expand Down Expand Up @@ -232,7 +234,9 @@ func TestGetKey(t *testing.T) {
// Initialize.
client := newTestEtcd(t)
keyFile := newTestKeyFile(t, re)
leadership := newTestLeader(re, client)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
leadership := newTestLeader(ctx, re, client)
// Store initial keys in etcd.
masterKeyMeta := newTestMasterKey(keyFile)
keys := &encryptionpb.KeyDictionary{
Expand Down Expand Up @@ -287,7 +291,9 @@ func TestLoadKeyEmpty(t *testing.T) {
// Initialize.
client := newTestEtcd(t)
keyFile := newTestKeyFile(t, re)
leadership := newTestLeader(re, client)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
leadership := newTestLeader(ctx, re, client)
// Store initial keys in etcd.
masterKeyMeta := newTestMasterKey(keyFile)
keys := &encryptionpb.KeyDictionary{
Expand Down Expand Up @@ -323,7 +329,7 @@ func TestWatcher(t *testing.T) {
defer cancel()
client := newTestEtcd(t)
keyFile := newTestKeyFile(t, re)
leadership := newTestLeader(re, client)
leadership := newTestLeader(ctx, re, client)
// Setup helper
helper := defaultKeyManagerHelper()
// Listen on watcher event
Expand Down Expand Up @@ -407,7 +413,9 @@ func TestSetLeadershipWithEncryptionOff(t *testing.T) {
re.NoError(err)
re.Nil(m.keys.Load())
// Set leadership
leadership := newTestLeader(re, client)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
leadership := newTestLeader(ctx, re, client)
err = m.SetLeadership(leadership)
re.NoError(err)
// Check encryption stays off.
Expand All @@ -424,7 +432,7 @@ func TestSetLeadershipWithEncryptionEnabling(t *testing.T) {
defer cancel()
client := newTestEtcd(t)
keyFile := newTestKeyFile(t, re)
leadership := newTestLeader(re, client)
leadership := newTestLeader(ctx, re, client)
// Setup helper
helper := defaultKeyManagerHelper()
// Listen on watcher event
Expand Down Expand Up @@ -477,7 +485,7 @@ func TestSetLeadershipWithEncryptionMethodChanged(t *testing.T) {
defer cancel()
client := newTestEtcd(t)
keyFile := newTestKeyFile(t, re)
leadership := newTestLeader(re, client)
leadership := newTestLeader(ctx, re, client)
// Setup helper
helper := defaultKeyManagerHelper()
// Mock time
Expand Down Expand Up @@ -553,7 +561,7 @@ func TestSetLeadershipWithCurrentKeyExposed(t *testing.T) {
defer cancel()
client := newTestEtcd(t)
keyFile := newTestKeyFile(t, re)
leadership := newTestLeader(re, client)
leadership := newTestLeader(ctx, re, client)
// Setup helper
helper := defaultKeyManagerHelper()
// Mock time
Expand Down Expand Up @@ -624,7 +632,7 @@ func TestSetLeadershipWithCurrentKeyExpired(t *testing.T) {
defer cancel()
client := newTestEtcd(t)
keyFile := newTestKeyFile(t, re)
leadership := newTestLeader(re, client)
leadership := newTestLeader(ctx, re, client)
// Setup helper
helper := defaultKeyManagerHelper()
// Mock time
Expand Down Expand Up @@ -700,7 +708,7 @@ func TestSetLeadershipWithMasterKeyChanged(t *testing.T) {
client := newTestEtcd(t)
keyFile := newTestKeyFile(t, re)
keyFile2 := newTestKeyFile(t, re, testMasterKey2)
leadership := newTestLeader(re, client)
leadership := newTestLeader(ctx, re, client)
// Setup helper
helper := defaultKeyManagerHelper()
// Mock time
Expand Down Expand Up @@ -764,7 +772,9 @@ func TestSetLeadershipMasterKeyWithCiphertextKey(t *testing.T) {
// Initialize.
client := newTestEtcd(t)
keyFile := newTestKeyFile(t, re)
leadership := newTestLeader(re, client)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
leadership := newTestLeader(ctx, re, client)
// Setup helper
helper := defaultKeyManagerHelper()
// Mock time
Expand Down Expand Up @@ -842,7 +852,7 @@ func TestSetLeadershipWithEncryptionDisabling(t *testing.T) {
defer cancel()
client := newTestEtcd(t)
keyFile := newTestKeyFile(t, re)
leadership := newTestLeader(re, client)
leadership := newTestLeader(ctx, re, client)
// Setup helper
helper := defaultKeyManagerHelper()
// Listen on watcher event
Expand Down Expand Up @@ -898,7 +908,7 @@ func TestKeyRotation(t *testing.T) {
defer cancel()
client := newTestEtcd(t)
keyFile := newTestKeyFile(t, re)
leadership := newTestLeader(re, client)
leadership := newTestLeader(ctx, re, client)
// Setup helper
helper := defaultKeyManagerHelper()
// Mock time
Expand Down Expand Up @@ -994,7 +1004,7 @@ func TestKeyRotationConflict(t *testing.T) {
defer cancel()
client := newTestEtcd(t)
keyFile := newTestKeyFile(t, re)
leadership := newTestLeader(re, client)
leadership := newTestLeader(ctx, re, client)
// Setup helper
helper := defaultKeyManagerHelper()
// Mock time
Expand Down
15 changes: 6 additions & 9 deletions pkg/mcs/resourcemanager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,10 @@ func (s *Server) primaryElectionLoop() {

func (s *Server) campaignLeader() {
log.Info("start to campaign the primary/leader", zap.String("campaign-resource-manager-primary-name", s.participant.Name()))
if err := s.participant.CampaignLeader(s.Context(), s.cfg.LeaderLease); err != nil {
ctx, cancel := context.WithCancel(s.serverLoopCtx)
defer cancel()
// Campaign leader and maintain the leadership.
if err := s.participant.CampaignLeader(ctx, s.cfg.LeaderLease); err != nil {
if err.Error() == errs.ErrEtcdTxnConflict.Error() {
log.Info("campaign resource manager primary meets error due to txn conflict, another server may campaign successfully",
zap.String("campaign-resource-manager-primary-name", s.participant.Name()))
Expand All @@ -165,19 +168,13 @@ func (s *Server) campaignLeader() {
return
}

// Start keepalive the leadership and enable Resource Manager service.
ctx, cancel := context.WithCancel(s.serverLoopCtx)
log.Info("campaign resource manager primary ok", zap.String("campaign-resource-manager-primary-name", s.participant.Name()))
// After the leadership granted, Resource Manager could be ready to provide service.
var resetLeaderOnce sync.Once
defer resetLeaderOnce.Do(func() {
cancel()
s.participant.ResetLeader()
member.ServiceMemberGauge.WithLabelValues(serviceName).Set(0)
})

// maintain the leadership, after this, Resource Manager could be ready to provide service.
s.participant.KeepLeader(ctx)
log.Info("campaign resource manager primary ok", zap.String("campaign-resource-manager-primary-name", s.participant.Name()))

log.Info("triggering the primary callback functions")
for _, cb := range s.primaryCallbacks {
cb(ctx)
Expand Down
15 changes: 6 additions & 9 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,10 @@ func (s *Server) primaryElectionLoop() {

func (s *Server) campaignLeader() {
log.Info("start to campaign the primary/leader", zap.String("campaign-scheduling-primary-name", s.participant.Name()))
if err := s.participant.CampaignLeader(s.Context(), s.cfg.LeaderLease); err != nil {
ctx, cancel := context.WithCancel(s.serverLoopCtx)
defer cancel()
// Campaign leader and maintain the leadership.
if err := s.participant.CampaignLeader(ctx, s.cfg.LeaderLease); err != nil {
if err.Error() == errs.ErrEtcdTxnConflict.Error() {
log.Info("campaign scheduling primary meets error due to txn conflict, another server may campaign successfully",
zap.String("campaign-scheduling-primary-name", s.participant.Name()))
Expand All @@ -260,19 +263,13 @@ func (s *Server) campaignLeader() {
return
}

// Start keepalive the leadership and enable Scheduling service.
ctx, cancel := context.WithCancel(s.serverLoopCtx)
log.Info("campaign scheduling primary ok", zap.String("campaign-scheduling-primary-name", s.participant.Name()))
// After the leadership granted, Scheduling could be ready to provide service.
var resetLeaderOnce sync.Once
defer resetLeaderOnce.Do(func() {
cancel()
s.participant.ResetLeader()
member.ServiceMemberGauge.WithLabelValues(serviceName).Set(0)
})

// maintain the leadership, after this, Scheduling could be ready to provide service.
s.participant.KeepLeader(ctx)
log.Info("campaign scheduling primary ok", zap.String("campaign-scheduling-primary-name", s.participant.Name()))

log.Info("triggering the primary callback functions")
for _, cb := range s.primaryCallbacks {
if err := cb(ctx); err != nil {
Expand Down
9 changes: 2 additions & 7 deletions pkg/member/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (m *EmbeddedEtcdMember) GetLastLeaderUpdatedTime() time.Time {
// leader should be changed when campaign leader frequently.
func (m *EmbeddedEtcdMember) CampaignLeader(ctx context.Context, leaseTimeout int64) error {
failpoint.Inject("skipCampaignLeaderCheck", func() {
failpoint.Return(m.leadership.Campaign(leaseTimeout, m.MemberValue()))
failpoint.Return(m.leadership.Campaign(ctx, leaseTimeout, m.MemberValue()))
})

if m.leadership.GetCampaignTimesNum() >= campaignLeaderFrequencyTimes {
Expand All @@ -194,12 +194,7 @@ func (m *EmbeddedEtcdMember) CampaignLeader(ctx context.Context, leaseTimeout in
return errs.ErrLeaderFrequentlyChange.FastGenByArgs(m.Name(), m.GetLeaderPath())
}

return m.leadership.Campaign(leaseTimeout, m.MemberValue())
}

// KeepLeader is used to keep the PD leader's leadership.
func (m *EmbeddedEtcdMember) KeepLeader(ctx context.Context) {
m.leadership.Keep(ctx)
return m.leadership.Campaign(ctx, leaseTimeout, m.MemberValue())
}

// PreCheckLeader does some pre-check before checking whether it's the leader.
Expand Down
11 changes: 3 additions & 8 deletions pkg/member/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,19 +186,14 @@ func (m *Participant) GetLeadership() *election.Leadership {
}

// CampaignLeader is used to campaign the leadership and make it become a leader.
func (m *Participant) CampaignLeader(_ context.Context, leaseTimeout int64) error {
func (m *Participant) CampaignLeader(ctx context.Context, leaseTimeout int64) error {
if !m.campaignCheck() {
return errs.ErrCheckCampaign
}
return m.leadership.Campaign(leaseTimeout, m.MemberValue())
return m.leadership.Campaign(ctx, leaseTimeout, m.MemberValue())
}

// KeepLeader is used to keep the leader's leadership.
func (m *Participant) KeepLeader(ctx context.Context) {
m.leadership.Keep(ctx)
}

// PreCheckLeader does some pre-check before checking whether or not it's the leader.
// PreCheckLeader does some pre-check before checking whether it's the leader.
// It returns true if it passes the pre-check, false otherwise.
func (m *Participant) PreCheckLeader() error {
// No specific thing to check. Returns no error.
Expand Down
Loading
Loading