Skip to content

Commit

Permalink
Include SetNXOnExtend as Option
Browse files Browse the repository at this point in the history
  • Loading branch information
palcalde committed Feb 15, 2024
1 parent dc7ca9e commit 9dbb9f3
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 8 deletions.
27 changes: 21 additions & 6 deletions mutex.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ type Mutex struct {

quorum int

genValueFunc func() (string, error)
value string
until time.Time
shuffle bool
failFast bool
genValueFunc func() (string, error)
value string
until time.Time
shuffle bool
failFast bool
setNXOnExtend bool

pools []redis.Pool
}
Expand Down Expand Up @@ -257,7 +258,7 @@ func (m *Mutex) release(ctx context.Context, pool redis.Pool, value string) (boo
return status != int64(0), nil
}

var touchScript = redis.NewScript(1, `
var touchWithSetNXScript = redis.NewScript(1, `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("PEXPIRE", KEYS[1], ARGV[2])
elseif redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2], "NX") then
Expand All @@ -267,12 +268,26 @@ var touchScript = redis.NewScript(1, `
end
`)

var touchScript = redis.NewScript(1, `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("PEXPIRE", KEYS[1], ARGV[2])
else
return 0
end
`)

func (m *Mutex) touch(ctx context.Context, pool redis.Pool, value string, expiry int) (bool, error) {
conn, err := pool.Get(ctx)
if err != nil {
return false, err
}
defer conn.Close()

touchScript := touchScript
if m.setNXOnExtend {
touchScript = touchWithSetNXScript
}

status, err := conn.Eval(touchScript, m.name, value, expiry)
if err != nil {
// extend failed: clean up locks
Expand Down
29 changes: 27 additions & 2 deletions mutex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func TestMutexExtend(t *testing.T) {
}
}

func TestMutexExtendExpiredAcquiresLockAgain(t *testing.T) {
func TestMutexExtendExpired(t *testing.T) {
for k, v := range makeCases(8) {
t.Run(k, func(t *testing.T) {
mutexes := newTestMutexes(v.pools, k+"-test-mutex-extend", 1)
Expand All @@ -154,6 +154,30 @@ func TestMutexExtendExpiredAcquiresLockAgain(t *testing.T) {

time.Sleep(1 * time.Second)

_, err = mutex.Extend()
if err == nil {
t.Fatalf("mutex extend didn't fail")
}
})
}
}

func TestSetNXOnExtendAcquiresLockWhenKeyIsExpired(t *testing.T) {
for k, v := range makeCases(8) {
t.Run(k, func(t *testing.T) {
mutexes := newTestMutexes(v.pools, k+"-test-mutex-extend", 1)
mutex := mutexes[0]
mutex.setNXOnExtend = true
mutex.expiry = 500 * time.Millisecond

err := mutex.Lock()
if err != nil {
t.Fatalf("mutex lock failed: %s", err)
}
defer mutex.Unlock()

time.Sleep(1 * time.Second)

_, err = mutex.Extend()
if err != nil {
t.Fatalf("mutex didn't extend")
Expand All @@ -162,7 +186,7 @@ func TestMutexExtendExpiredAcquiresLockAgain(t *testing.T) {
}
}

func TestMutexExtendExpiredFailsIfLockIsAlreadyTaken(t *testing.T) {
func TestSetNXOnExtendFailsToAcquireLockWhenKeyIsTaken(t *testing.T) {
for k, v := range makeCases(8) {
t.Run(k, func(t *testing.T) {
firstMutex := newTestMutexes(v.pools, k+"-test-mutex-extend", 1)[0]
Expand All @@ -185,6 +209,7 @@ func TestMutexExtendExpiredFailsIfLockIsAlreadyTaken(t *testing.T) {
}

ok, err := firstMutex.Extend()
firstMutex.setNXOnExtend = true
if err == nil {
t.Fatalf("mutex extend didn't fail")
}
Expand Down
10 changes: 10 additions & 0 deletions redsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,16 @@ func WithRetryDelay(delay time.Duration) Option {
})
}

// WithSetNXOnExtend improves extending logic to extend the key if exist
// and if not, tries to set a new key in redis
// Useful if your redises restart often and you want to reduce the chances of losing the lock
// Read this MR for more info: https://github.com/go-redsync/redsync/pull/149
func WithSetNXOnExtend() Option {
return OptionFunc(func(m *Mutex) {
m.setNXOnExtend = true
})
}

// WithRetryDelayFunc can be used to override default delay behavior.
func WithRetryDelayFunc(delayFunc DelayFunc) Option {
return OptionFunc(func(m *Mutex) {
Expand Down

0 comments on commit 9dbb9f3

Please sign in to comment.