Skip to content

Commit a009ea2

Browse files
Proper leader election without using observe and TTL features
1 parent afa1784 commit a009ea2

File tree

6 files changed

+271
-247
lines changed

6 files changed

+271
-247
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
## [master](https://github.com/arangodb-helper/arangodb/tree/master) (N/A)
44
- Fix context handling in WaitUntilStarterReady for tests
5+
- Proper leader election without using observe and TTL features
56

67
## [0.15.8](https://github.com/arangodb-helper/arangodb/tree/0.15.8) (2023-06-02)
78
- Add passing ARANGODB_SERVER_DIR env variable when starting arangod instances

Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ DOCKER_CMD = $(DOCKERCLI) run \
105105
-e GOARCH=$(GOARCH) \
106106
-e CGO_ENABLED=0 \
107107
-e TRAVIS=$(TRAVIS) \
108+
-e VERBOSE=$(VERBOSE) \
108109
$(DOCKER_PARAMS) \
109110
-w /usr/code/ \
110111
$(DOCKER_IMAGE)

pkg/election/leader_election.go

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
//
2+
// DISCLAIMER
3+
//
4+
// Copyright 2023 ArangoDB GmbH, Cologne, Germany
5+
//
6+
// Licensed under the Apache License, Version 2.0 (the "License");
7+
// you may not use this file except in compliance with the License.
8+
// You may obtain a copy of the License at
9+
//
10+
// http://www.apache.org/licenses/LICENSE-2.0
11+
//
12+
// Unless required by applicable law or agreed to in writing, software
13+
// distributed under the License is distributed on an "AS IS" BASIS,
14+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
// See the License for the specific language governing permissions and
16+
// limitations under the License.
17+
//
18+
// Copyright holder is ArangoDB GmbH, Cologne, Germany
19+
//
20+
21+
package election
22+
23+
import (
24+
"context"
25+
"time"
26+
27+
"github.com/arangodb/go-driver"
28+
"github.com/arangodb/go-driver/agency"
29+
)
30+
31+
func NewLeaderElectionCell[T comparable](c agency.Agency, key []string, ttl time.Duration) *LeaderElectionCell[T] {
32+
return &LeaderElectionCell[T]{
33+
agency: c,
34+
lastTTL: 0,
35+
leading: false,
36+
key: key,
37+
ttl: ttl,
38+
}
39+
}
40+
41+
type LeaderElectionCell[T comparable] struct {
42+
agency agency.Agency
43+
lastTTL int64
44+
leading bool
45+
key []string
46+
ttl time.Duration
47+
}
48+
49+
type leaderStruct[T comparable] struct {
50+
Data T `json:"data,omitempty"`
51+
TTL int64 `json:"ttl,omitempty"`
52+
}
53+
54+
func (l *LeaderElectionCell[T]) tryBecomeLeader(ctx context.Context, value T, assumeEmpty bool) error {
55+
trx := agency.NewTransaction("", agency.TransactionOptions{})
56+
57+
newTTL := time.Now().Add(l.ttl).Unix()
58+
trx.AddKey(agency.NewKeySet(l.key, leaderStruct[T]{Data: value, TTL: newTTL}, 0))
59+
if assumeEmpty {
60+
trx.AddCondition(l.key, agency.NewConditionOldEmpty(true))
61+
} else {
62+
key := append(l.key, "ttl")
63+
trx.AddCondition(key, agency.NewConditionIfEqual(l.lastTTL))
64+
}
65+
66+
if err := l.agency.WriteTransaction(ctx, trx); err == nil {
67+
l.lastTTL = newTTL
68+
l.leading = true
69+
} else {
70+
return err
71+
}
72+
73+
return nil
74+
}
75+
76+
func (l *LeaderElectionCell[T]) Read(ctx context.Context) (T, error) {
77+
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
78+
defer cancel()
79+
80+
var result leaderStruct[T]
81+
if err := l.agency.ReadKey(ctx, l.key, &result); err != nil {
82+
var def T
83+
if agency.IsKeyNotFound(err) {
84+
return def, nil
85+
}
86+
return def, err
87+
}
88+
return result.Data, nil
89+
}
90+
91+
// Update checks the current leader cell and if no leader is present
92+
// it tries to put itself in there. Will return the value currently present,
93+
// whether we are leader and a duration after which Updated should be called again.
94+
func (l *LeaderElectionCell[T]) Update(ctx context.Context, value T) (T, bool, time.Duration, error) {
95+
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
96+
defer cancel()
97+
for {
98+
assumeEmpty := false
99+
var result leaderStruct[T]
100+
if err := l.agency.ReadKey(ctx, l.key, &result); err != nil {
101+
if agency.IsKeyNotFound(err) {
102+
assumeEmpty = true
103+
goto tryLeaderElection
104+
}
105+
assumeEmpty = false
106+
}
107+
108+
{
109+
now := time.Now()
110+
if result.TTL < now.Unix() {
111+
l.lastTTL = result.TTL
112+
l.leading = false
113+
goto tryLeaderElection
114+
}
115+
116+
if result.TTL == l.lastTTL && l.leading {
117+
// try to update the ttl
118+
goto tryLeaderElection
119+
} else {
120+
// some new leader has been established
121+
l.lastTTL = result.TTL
122+
l.leading = false
123+
return result.Data, false, time.Duration(l.lastTTL - now.Unix()), nil
124+
}
125+
}
126+
127+
tryLeaderElection:
128+
if err := l.tryBecomeLeader(ctx, value, assumeEmpty); err == nil {
129+
return value, true, l.ttl / 2, nil
130+
} else if !driver.IsPreconditionFailed(err) {
131+
var def T
132+
return def, false, 0, err
133+
}
134+
}
135+
}
136+
137+
// Resign tries to resign leadership
138+
func (l *LeaderElectionCell[T]) Resign(ctx context.Context) error {
139+
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
140+
defer cancel()
141+
142+
// delete the key with precondition that ttl is as expected
143+
if !l.leading {
144+
return nil
145+
}
146+
l.leading = false
147+
trx := agency.NewTransaction("", agency.TransactionOptions{})
148+
key := append(l.key, "ttl")
149+
trx.AddCondition(key, agency.NewConditionIfEqual(l.lastTTL))
150+
trx.AddKey(agency.NewKeyDelete(l.key))
151+
return l.agency.WriteTransaction(ctx, trx)
152+
}

0 commit comments

Comments
 (0)