Skip to content

Add leader election and agency Lock implementation #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 152 additions & 0 deletions pkg/arangod/agency/election/leader_election.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
//
// DISCLAIMER
//
// Copyright 2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//

package election

import (
"context"
"time"

driver "github.com/arangodb/go-driver"
"github.com/arangodb/go-driver/agency"
)

func NewLeaderElectionCell[T comparable](c agency.Agency, key []string, ttl time.Duration) *LeaderElectionCell[T] {
return &LeaderElectionCell[T]{
agency: c,
lastTTL: 0,
leading: false,
key: key,
ttl: ttl,
}
}

type LeaderElectionCell[T comparable] struct {
agency agency.Agency
lastTTL int64
leading bool
key []string
ttl time.Duration
}

type leaderStruct[T comparable] struct {
Data T `json:"data,omitempty"`
TTL int64 `json:"ttl,omitempty"`
}

func (l *LeaderElectionCell[T]) tryBecomeLeader(ctx context.Context, value T, assumeEmpty bool) error {
trx := agency.NewTransaction("", agency.TransactionOptions{})

newTTL := time.Now().Add(l.ttl).Unix()
trx.AddKey(agency.NewKeySet(l.key, leaderStruct[T]{Data: value, TTL: newTTL}, 0))
if assumeEmpty {
trx.AddCondition(l.key, agency.NewConditionOldEmpty(true))
} else {
key := append(l.key, "ttl")
trx.AddCondition(key, agency.NewConditionIfEqual(l.lastTTL))
}

if err := l.agency.WriteTransaction(ctx, trx); err == nil {
l.lastTTL = newTTL
l.leading = true
} else {
return err
}

return nil
}

func (l *LeaderElectionCell[T]) Read(ctx context.Context) (T, error) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

var result leaderStruct[T]
if err := l.agency.ReadKey(ctx, l.key, &result); err != nil {
var def T
if agency.IsKeyNotFound(err) {
return def, nil
}
return def, err
}
return result.Data, nil
}

// Update checks the current leader cell and if no leader is present
// it tries to put itself in there. Will return the value currently present,
// whether we are leader and a duration after which Updated should be called again.
func (l *LeaderElectionCell[T]) Update(ctx context.Context, value T) (T, bool, time.Duration, error) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
for {
assumeEmpty := false
var result leaderStruct[T]
if err := l.agency.ReadKey(ctx, l.key, &result); err != nil {
if agency.IsKeyNotFound(err) {
assumeEmpty = true
goto tryLeaderElection
}
assumeEmpty = false
}

{
now := time.Now()
if result.TTL < now.Unix() {
l.lastTTL = result.TTL
l.leading = false
goto tryLeaderElection
}

if result.TTL == l.lastTTL && l.leading {
// try to update the ttl
goto tryLeaderElection
} else {
// some new leader has been established
l.lastTTL = result.TTL
l.leading = false
return result.Data, false, time.Unix(l.lastTTL, 0).Sub(now), nil
}
}

tryLeaderElection:
if err := l.tryBecomeLeader(ctx, value, assumeEmpty); err == nil {
return value, true, l.ttl / 2, nil
} else if !driver.IsPreconditionFailed(err) {
var def T
return def, false, 0, err
}
}
}

// Resign tries to resign leadership
func (l *LeaderElectionCell[T]) Resign(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

// delete the key with precondition that ttl is as expected
if !l.leading {
return nil
}
l.leading = false
trx := agency.NewTransaction("", agency.TransactionOptions{})
key := append(l.key, "ttl")
trx.AddCondition(key, agency.NewConditionIfEqual(l.lastTTL))
trx.AddKey(agency.NewKeyDelete(l.key))
return l.agency.WriteTransaction(ctx, trx)
}
190 changes: 190 additions & 0 deletions pkg/arangod/agency/election/lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
//
// DISCLAIMER
//
// Copyright 2018-2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//

package election

import (
"context"
"crypto/rand"
"encoding/hex"
"sync"
"time"

driver "github.com/arangodb/go-driver"
"github.com/arangodb/go-driver/agency"
)

const (
minLockTTL = time.Second * 5
)

// Lock is an agency backed exclusive lock.
type Lock interface {
// Lock tries to lock the lock.
// If it is not possible to lock, an error is returned.
// If the lock is already held by me, an error is returned.
Lock(ctx context.Context) error

// Unlock tries to unlock the lock.
// If it is not possible to unlock, an error is returned.
// If the lock is not held by me, an error is returned.
Unlock(ctx context.Context) error

// IsLocked return true if the lock is held by me.
IsLocked() bool
}

// Logger abstracts a logger.
type Logger interface {
Errorf(msg string, args ...interface{})
}

// NewLock creates a new lock on the given key.
func NewLock(log Logger, api agency.Agency, key []string, id string, ttl time.Duration) (Lock, error) {
if ttl < minLockTTL {
ttl = minLockTTL
}
if id == "" {
randBytes := make([]byte, 16)
_, err := rand.Read(randBytes)
if err != nil {
return nil, err
}
id = hex.EncodeToString(randBytes)
}
return &lock{
log: log,
id: id,
cell: NewLeaderElectionCell[string](api, key, ttl),
}, nil
}

type lock struct {
mutex sync.Mutex
log Logger

cell *LeaderElectionCell[string]

id string
cancelRenewal func()
}

// Lock tries to lock the lock.
// If it is not possible to lock, an error is returned.
// If the lock is already held by me, an error is returned.
func (l *lock) Lock(ctx context.Context) error {
l.mutex.Lock()
defer l.mutex.Unlock()

if l.cell.leading {
return driver.WithStack(AlreadyLockedError)
}

_, isLocked, nextUpdateIn, err := l.cell.Update(ctx, l.id)
if err != nil {
return err
}

if !isLocked {
// locked by someone
return driver.WithStack(AlreadyLockedError)
}

// Keep renewing
renewCtx, renewCancel := context.WithCancel(context.Background())
go l.renewLock(renewCtx, nextUpdateIn)
l.cancelRenewal = renewCancel

return nil
}

// Unlock tries to unlock the lock.
// If it is not possible to unlock, an error is returned.
// If the lock is not held by me, an error is returned.
func (l *lock) Unlock(ctx context.Context) error {
l.mutex.Lock()
defer l.mutex.Unlock()

if !l.cell.leading {
return driver.WithStack(NotLockedError)
}

err := l.cell.Resign(ctx)
if err != nil {
return err
}

defer func() {
if l.cancelRenewal != nil {
l.cancelRenewal()
l.cancelRenewal = nil
}
}()

return nil
}

// IsLocked return true if the lock is held by me.
func (l *lock) IsLocked() bool {
l.mutex.Lock()
defer l.mutex.Unlock()
return l.cell.leading
}

// renewLock keeps renewing the lock until the given context is canceled.
func (l *lock) renewLock(ctx context.Context, delay time.Duration) {
op := func() (bool, time.Duration, error) {
l.mutex.Lock()
defer l.mutex.Unlock()

if !l.cell.leading {
return true, 0, nil
}

_, stillLeading, newDelay, err := l.cell.Update(ctx, l.id)
return stillLeading, newDelay, err
}
for {
var leading bool
var err error
leading, delay, err = op()
if err != nil {
if l.log != nil && driver.Cause(err) != context.Canceled {
l.log.Errorf("Failed to renew lock %s. %v", l.cell.key, err)
}
delay = time.Second
}
if !leading || driver.Cause(err) == context.Canceled {
return
}

timer := time.NewTimer(delay)
select {
case <-timer.C:
// Delay over, just continue
case <-ctx.Done():
// We're asked to stop
if !timer.Stop() {
<-timer.C
}
return
}
}
}
32 changes: 32 additions & 0 deletions pkg/arangod/agency/election/lock_errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
//
// DISCLAIMER
//
// Copyright 2018-2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//

package election

import (
"errors"
)

var (
// AlreadyLockedError indicates that the lock is already locked.
AlreadyLockedError = errors.New("already locked")
// NotLockedError indicates that the lock is not locked when trying to unlock.
NotLockedError = errors.New("not locked")
)