Skip to content

Commit fe26e44

Browse files
agency Lock reworked without using TTL feature
1 parent f729d32 commit fe26e44

File tree

5 files changed

+227
-9
lines changed

5 files changed

+227
-9
lines changed

pkg/election/leader_election.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,13 @@ import (
2424
"context"
2525
"time"
2626

27-
"github.com/rs/zerolog"
28-
2927
"github.com/arangodb/go-driver"
3028
"github.com/arangodb/go-driver/agency"
3129
)
3230

33-
func NewLeaderElectionCell[T comparable](l zerolog.Logger, c agency.Agency, key []string, ttl time.Duration) *LeaderElectionCell[T] {
31+
func NewLeaderElectionCell[T comparable](c agency.Agency, key []string, ttl time.Duration) *LeaderElectionCell[T] {
3432
return &LeaderElectionCell[T]{
3533
agency: c,
36-
log: l,
3734
lastTTL: 0,
3835
leading: false,
3936
key: key,
@@ -43,7 +40,6 @@ func NewLeaderElectionCell[T comparable](l zerolog.Logger, c agency.Agency, key
4340

4441
type LeaderElectionCell[T comparable] struct {
4542
agency agency.Agency
46-
log zerolog.Logger
4743
lastTTL int64
4844
leading bool
4945
key []string
@@ -106,7 +102,6 @@ func (l *LeaderElectionCell[T]) Update(ctx context.Context, value T) (T, bool, t
106102
assumeEmpty = true
107103
goto tryLeaderElection
108104
}
109-
l.log.Warn().Err(err).Msg("Error while reading leader election key")
110105
assumeEmpty = false
111106
}
112107

pkg/election/lock.go

Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
//
2+
// DISCLAIMER
3+
//
4+
// Copyright 2018-2023 ArangoDB GmbH, Cologne, Germany
5+
//
6+
// Licensed under the Apache License, Version 2.0 (the "License");
7+
// you may not use this file except in compliance with the License.
8+
// You may obtain a copy of the License at
9+
//
10+
// http://www.apache.org/licenses/LICENSE-2.0
11+
//
12+
// Unless required by applicable law or agreed to in writing, software
13+
// distributed under the License is distributed on an "AS IS" BASIS,
14+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
// See the License for the specific language governing permissions and
16+
// limitations under the License.
17+
//
18+
// Copyright holder is ArangoDB GmbH, Cologne, Germany
19+
//
20+
21+
package election
22+
23+
import (
24+
"context"
25+
"crypto/rand"
26+
"encoding/hex"
27+
"sync"
28+
"time"
29+
30+
"github.com/arangodb/go-driver"
31+
"github.com/arangodb/go-driver/agency"
32+
)
33+
34+
const (
35+
minLockTTL = time.Second * 5
36+
)
37+
38+
// Lock is an agency backed exclusive lock.
39+
type Lock interface {
40+
// Lock tries to lock the lock.
41+
// If it is not possible to lock, an error is returned.
42+
// If the lock is already held by me, an error is returned.
43+
Lock(ctx context.Context) error
44+
45+
// Unlock tries to unlock the lock.
46+
// If it is not possible to unlock, an error is returned.
47+
// If the lock is not held by me, an error is returned.
48+
Unlock(ctx context.Context) error
49+
50+
// IsLocked return true if the lock is held by me.
51+
IsLocked() bool
52+
}
53+
54+
// Logger abstracts a logger.
55+
type Logger interface {
56+
Errorf(msg string, args ...interface{})
57+
}
58+
59+
// NewLock creates a new lock on the given key.
60+
func NewLock(log Logger, api agency.Agency, key []string, id string, ttl time.Duration) (Lock, error) {
61+
if ttl < minLockTTL {
62+
ttl = minLockTTL
63+
}
64+
if id == "" {
65+
randBytes := make([]byte, 16)
66+
_, err := rand.Read(randBytes)
67+
if err != nil {
68+
return nil, err
69+
}
70+
id = hex.EncodeToString(randBytes)
71+
}
72+
return &lock{
73+
log: log,
74+
id: id,
75+
cell: NewLeaderElectionCell[string](api, key, ttl),
76+
}, nil
77+
}
78+
79+
type lock struct {
80+
mutex sync.Mutex
81+
log Logger
82+
83+
cell *LeaderElectionCell[string]
84+
85+
id string
86+
cancelRenewal func()
87+
}
88+
89+
// Lock tries to lock the lock.
90+
// If it is not possible to lock, an error is returned.
91+
// If the lock is already held by me, an error is returned.
92+
func (l *lock) Lock(ctx context.Context) error {
93+
l.mutex.Lock()
94+
defer l.mutex.Unlock()
95+
96+
if l.cell.leading {
97+
return driver.WithStack(AlreadyLockedError)
98+
}
99+
100+
_, isLocked, nextUpdateIn, err := l.cell.Update(ctx, l.id)
101+
if err != nil {
102+
return err
103+
}
104+
105+
if !isLocked {
106+
// locked by someone
107+
return driver.WithStack(AlreadyLockedError)
108+
}
109+
110+
// Keep renewing
111+
renewCtx, renewCancel := context.WithCancel(context.Background())
112+
go l.renewLock(renewCtx, nextUpdateIn)
113+
l.cancelRenewal = renewCancel
114+
115+
return nil
116+
}
117+
118+
// Unlock tries to unlock the lock.
119+
// If it is not possible to unlock, an error is returned.
120+
// If the lock is not held by me, an error is returned.
121+
func (l *lock) Unlock(ctx context.Context) error {
122+
l.mutex.Lock()
123+
defer l.mutex.Unlock()
124+
125+
if !l.cell.leading {
126+
return driver.WithStack(NotLockedError)
127+
}
128+
129+
err := l.cell.Resign(ctx)
130+
if err != nil {
131+
return err
132+
}
133+
134+
defer func() {
135+
if l.cancelRenewal != nil {
136+
l.cancelRenewal()
137+
l.cancelRenewal = nil
138+
}
139+
}()
140+
141+
return nil
142+
}
143+
144+
// IsLocked return true if the lock is held by me.
145+
func (l *lock) IsLocked() bool {
146+
l.mutex.Lock()
147+
defer l.mutex.Unlock()
148+
return l.cell.leading
149+
}
150+
151+
// renewLock keeps renewing the lock until the given context is canceled.
152+
func (l *lock) renewLock(ctx context.Context, delay time.Duration) {
153+
op := func() (bool, time.Duration, error) {
154+
l.mutex.Lock()
155+
defer l.mutex.Unlock()
156+
157+
if !l.cell.leading {
158+
return true, 0, nil
159+
}
160+
161+
_, stillLeading, newDelay, err := l.cell.Update(ctx, l.id)
162+
return stillLeading, newDelay, err
163+
}
164+
for {
165+
var leading bool
166+
var err error
167+
leading, delay, err = op()
168+
if err != nil {
169+
if l.log != nil && driver.Cause(err) != context.Canceled {
170+
l.log.Errorf("Failed to renew lock %s. %v", l.cell.key, err)
171+
}
172+
delay = time.Second
173+
}
174+
if !leading || driver.Cause(err) == context.Canceled {
175+
return
176+
}
177+
178+
timer := time.NewTimer(delay)
179+
select {
180+
case <-timer.C:
181+
// Delay over, just continue
182+
case <-ctx.Done():
183+
// We're asked to stop
184+
if !timer.Stop() {
185+
<-timer.C
186+
}
187+
return
188+
}
189+
}
190+
}

pkg/election/lock_errors.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
//
2+
// DISCLAIMER
3+
//
4+
// Copyright 2018-2023 ArangoDB GmbH, Cologne, Germany
5+
//
6+
// Licensed under the Apache License, Version 2.0 (the "License");
7+
// you may not use this file except in compliance with the License.
8+
// You may obtain a copy of the License at
9+
//
10+
// http://www.apache.org/licenses/LICENSE-2.0
11+
//
12+
// Unless required by applicable law or agreed to in writing, software
13+
// distributed under the License is distributed on an "AS IS" BASIS,
14+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
// See the License for the specific language governing permissions and
16+
// limitations under the License.
17+
//
18+
// Copyright holder is ArangoDB GmbH, Cologne, Germany
19+
//
20+
21+
package election
22+
23+
import (
24+
"errors"
25+
)
26+
27+
var (
28+
// AlreadyLockedError indicates that the lock is already locked.
29+
AlreadyLockedError = errors.New("already locked")
30+
// NotLockedError indicates that the lock is not locked when trying to unlock.
31+
NotLockedError = errors.New("not locked")
32+
)

service/runtime_cluster_manager.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ func (s *runtimeClusterManager) updateClusterConfiguration(ctx context.Context,
108108
}
109109

110110
func (s *runtimeClusterManager) runLeaderElection(ctx context.Context, agencyClient agency.Agency, myURL string) {
111-
le := election.NewLeaderElectionCell[string](s.log, agencyClient, masterURLKey, masterURLTTL)
111+
le := election.NewLeaderElectionCell[string](agencyClient, masterURLKey, masterURLTTL)
112112

113113
var err error
114114
var delay time.Duration

service/upgrade_manager.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import (
4242

4343
"github.com/arangodb-helper/arangodb/client"
4444
"github.com/arangodb-helper/arangodb/pkg/definitions"
45+
"github.com/arangodb-helper/arangodb/pkg/election"
4546
"github.com/arangodb-helper/arangodb/pkg/trigger"
4647
)
4748

@@ -303,7 +304,7 @@ func (m *upgradeManager) StartDatabaseUpgrade(ctx context.Context, forceMinorUpg
303304
return maskAny(err)
304305
}
305306
m.log.Debug().Msg("Creating lock")
306-
lock, err := agency.NewLock(m, api, upgradeManagerLockKey, "", upgradeManagerLockTTL)
307+
lock, err := election.NewLock(m, api, upgradeManagerLockKey, "", upgradeManagerLockTTL)
307308
if err != nil {
308309
return maskAny(err)
309310
}
@@ -546,7 +547,7 @@ func (m *upgradeManager) AbortDatabaseUpgrade(ctx context.Context) error {
546547
return maskAny(err)
547548
}
548549
m.log.Debug().Msg("Creating lock")
549-
lock, err := agency.NewLock(m, api, upgradeManagerLockKey, "", upgradeManagerLockTTL)
550+
lock, err := election.NewLock(m, api, upgradeManagerLockKey, "", upgradeManagerLockTTL)
550551
if err != nil {
551552
return maskAny(err)
552553
}

0 commit comments

Comments
 (0)