Skip to content

Refactor globallock #31933

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions modules/globallock/globallock.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,20 @@ func DefaultLocker() Locker {

// Lock tries to acquire a lock for the given key, it uses the default locker.
// Read the documentation of Locker.Lock for more information about the behavior.
func Lock(ctx context.Context, key string) (context.Context, ReleaseFunc, error) {
func Lock(ctx context.Context, key string) (ReleaseFunc, error) {
return DefaultLocker().Lock(ctx, key)
}

// TryLock tries to acquire a lock for the given key, it uses the default locker.
// Read the documentation of Locker.TryLock for more information about the behavior.
func TryLock(ctx context.Context, key string) (bool, context.Context, ReleaseFunc, error) {
func TryLock(ctx context.Context, key string) (bool, ReleaseFunc, error) {
return DefaultLocker().TryLock(ctx, key)
}

// LockAndDo tries to acquire a lock for the given key and then calls the given function.
// It uses the default locker, and it will return an error if failed to acquire the lock.
func LockAndDo(ctx context.Context, key string, f func(context.Context) error) error {
ctx, release, err := Lock(ctx, key)
release, err := Lock(ctx, key)
if err != nil {
return err
}
Expand All @@ -52,7 +52,7 @@ func LockAndDo(ctx context.Context, key string, f func(context.Context) error) e
// TryLockAndDo tries to acquire a lock for the given key and then calls the given function.
// It uses the default locker, and it will return false if failed to acquire the lock.
func TryLockAndDo(ctx context.Context, key string, f func(context.Context) error) (bool, error) {
ok, ctx, release, err := TryLock(ctx, key)
ok, release, err := TryLock(ctx, key)
if err != nil {
return false, err
}
Expand Down
30 changes: 4 additions & 26 deletions modules/globallock/locker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,56 +5,34 @@ package globallock

import (
"context"
"fmt"
)

type Locker interface {
// Lock tries to acquire a lock for the given key, it blocks until the lock is acquired or the context is canceled.
//
// Lock returns a new context which should be used in the following code.
// The new context will be canceled when the lock is released or lost - yes, it's possible to lose a lock.
// For example, it lost the connection to the redis server while holding the lock.
// If it fails to acquire the lock, the returned context will be the same as the input context.
//
// Lock returns a ReleaseFunc to release the lock, it cannot be nil.
// It's always safe to call this function even if it fails to acquire the lock, and it will do nothing in that case.
// And it's also safe to call it multiple times, but it will only release the lock once.
// That's why it's called ReleaseFunc, not UnlockFunc.
// But be aware that it's not safe to not call it at all; it could lead to a memory leak.
// So a recommended pattern is to use defer to call it:
// ctx, release, err := locker.Lock(ctx, "key")
// if err != nil {
// return err
// }
// defer release()
// The ReleaseFunc will return the original context which was used to acquire the lock.
// It's useful when you want to continue to do something after releasing the lock.
// At that time, the ctx will be canceled, and you can use the returned context by the ReleaseFunc to continue:
// ctx, release, err := locker.Lock(ctx, "key")
// release, err := locker.Lock(ctx, "key")
// if err != nil {
// return err
// }
// defer release()
// doSomething(ctx)
// ctx = release()
// doSomethingElse(ctx)
// Please ignore it and use `defer release()` instead if you don't need this, to avoid forgetting to release the lock.
//
// Lock returns an error if failed to acquire the lock.
// Be aware that even the context is not canceled, it's still possible to fail to acquire the lock.
// For example, redis is down, or it reached the maximum number of tries.
Lock(ctx context.Context, key string) (context.Context, ReleaseFunc, error)
Lock(ctx context.Context, key string) (ReleaseFunc, error)

// TryLock tries to acquire a lock for the given key, it returns immediately.
// It follows the same pattern as Lock, but it doesn't block.
// And if it fails to acquire the lock because it's already locked, not other reasons like redis is down,
// it will return false without any error.
TryLock(ctx context.Context, key string) (bool, context.Context, ReleaseFunc, error)
TryLock(ctx context.Context, key string) (bool, ReleaseFunc, error)
}

// ReleaseFunc is a function that releases a lock.
// It returns the original context which was used to acquire the lock.
type ReleaseFunc func() context.Context

// ErrLockReleased is used as context cause when a lock is released
var ErrLockReleased = fmt.Errorf("lock released")
type ReleaseFunc func()
70 changes: 20 additions & 50 deletions modules/globallock/locker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,27 +47,24 @@ func TestLocker(t *testing.T) {
func testLocker(t *testing.T, locker Locker) {
t.Run("lock", func(t *testing.T) {
parentCtx := context.Background()
ctx, release, err := locker.Lock(parentCtx, "test")
release, err := locker.Lock(parentCtx, "test")
defer release()

assert.NotEqual(t, parentCtx, ctx) // new context should be returned
assert.NoError(t, err)

func() {
parentCtx, cancel := context.WithTimeout(context.Background(), time.Second)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
ctx, release, err := locker.Lock(parentCtx, "test")
release, err := locker.Lock(ctx, "test")
defer release()

assert.Error(t, err)
assert.Equal(t, parentCtx, ctx) // should return the same context
}()

release()
assert.Error(t, ctx.Err())

func() {
_, release, err := locker.Lock(context.Background(), "test")
release, err := locker.Lock(context.Background(), "test")
defer release()

assert.NoError(t, err)
Expand All @@ -76,29 +73,26 @@ func testLocker(t *testing.T, locker Locker) {

t.Run("try lock", func(t *testing.T) {
parentCtx := context.Background()
ok, ctx, release, err := locker.TryLock(parentCtx, "test")
ok, release, err := locker.TryLock(parentCtx, "test")
defer release()

assert.True(t, ok)
assert.NotEqual(t, parentCtx, ctx) // new context should be returned
assert.NoError(t, err)

func() {
parentCtx, cancel := context.WithTimeout(context.Background(), time.Second)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
ok, ctx, release, err := locker.TryLock(parentCtx, "test")
ok, release, err := locker.TryLock(ctx, "test")
defer release()

assert.False(t, ok)
assert.NoError(t, err)
assert.Equal(t, parentCtx, ctx) // should return the same context
}()

release()
assert.Error(t, ctx.Err())

func() {
ok, _, release, _ := locker.TryLock(context.Background(), "test")
ok, release, _ := locker.TryLock(context.Background(), "test")
defer release()

assert.True(t, ok)
Expand All @@ -107,15 +101,15 @@ func testLocker(t *testing.T, locker Locker) {

t.Run("wait and acquired", func(t *testing.T) {
ctx := context.Background()
_, release, err := locker.Lock(ctx, "test")
release, err := locker.Lock(ctx, "test")
require.NoError(t, err)

wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
started := time.Now()
_, release, err := locker.Lock(context.Background(), "test") // should be blocked for seconds
release, err := locker.Lock(context.Background(), "test") // should be blocked for seconds
defer release()
assert.Greater(t, time.Since(started), time.Second)
assert.NoError(t, err)
Expand All @@ -127,34 +121,15 @@ func testLocker(t *testing.T, locker Locker) {
wg.Wait()
})

t.Run("continue after release", func(t *testing.T) {
ctx := context.Background()

ctxBeforeLock := ctx
ctx, release, err := locker.Lock(ctx, "test")

require.NoError(t, err)
assert.NoError(t, ctx.Err())
assert.NotEqual(t, ctxBeforeLock, ctx)

ctxBeforeRelease := ctx
ctx = release()

assert.NoError(t, ctx.Err())
assert.Error(t, ctxBeforeRelease.Err())

// so it can continue with ctx to do more work
})

t.Run("multiple release", func(t *testing.T) {
ctx := context.Background()

_, release1, err := locker.Lock(ctx, "test")
release1, err := locker.Lock(ctx, "test")
require.NoError(t, err)

release1()

_, release2, err := locker.Lock(ctx, "test")
release2, err := locker.Lock(ctx, "test")
defer release2()
require.NoError(t, err)

Expand All @@ -163,7 +138,7 @@ func testLocker(t *testing.T, locker Locker) {
// and it shouldn't affect the other lock
release1()

ok, _, release3, err := locker.TryLock(ctx, "test")
ok, release3, err := locker.TryLock(ctx, "test")
defer release3()
require.NoError(t, err)
// It should be able to acquire the lock;
Expand All @@ -184,28 +159,23 @@ func testRedisLocker(t *testing.T, locker *redisLocker) {
// Otherwise, it will affect other tests.
t.Run("close", func(t *testing.T) {
assert.NoError(t, locker.Close())
_, _, err := locker.Lock(context.Background(), "test")
_, err := locker.Lock(context.Background(), "test")
assert.Error(t, err)
})
}()

t.Run("failed extend", func(t *testing.T) {
ctx, release, err := locker.Lock(context.Background(), "test")
release, err := locker.Lock(context.Background(), "test")
defer release()
require.NoError(t, err)

// It simulates that there are some problems with extending like network issues or redis server down.
v, ok := locker.mutexM.Load("test")
require.True(t, ok)
m := v.(*redisMutex)
_, _ = m.mutex.Unlock() // release it to make it impossible to extend

select {
case <-time.After(redisLockExpiry + time.Second):
t.Errorf("lock should be expired")
case <-ctx.Done():
var errTaken *redsync.ErrTaken
assert.ErrorAs(t, context.Cause(ctx), &errTaken)
}
m := v.(*redsync.Mutex)
_, _ = m.Unlock() // release it to make it impossible to extend

// In current design, callers can't know the lock can't be extended.
// Just keep this case to improve the test coverage.
})
}
27 changes: 7 additions & 20 deletions modules/globallock/memory_locker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,13 @@ func NewMemoryLocker() Locker {
return &memoryLocker{}
}

func (l *memoryLocker) Lock(ctx context.Context, key string) (context.Context, ReleaseFunc, error) {
originalCtx := ctx

func (l *memoryLocker) Lock(ctx context.Context, key string) (ReleaseFunc, error) {
if l.tryLock(key) {
ctx, cancel := context.WithCancelCause(ctx)
releaseOnce := sync.Once{}
return ctx, func() context.Context {
return func() {
releaseOnce.Do(func() {
l.locks.Delete(key)
cancel(ErrLockReleased)
})
return originalCtx
}, nil
}

Expand All @@ -39,39 +34,31 @@ func (l *memoryLocker) Lock(ctx context.Context, key string) (context.Context, R
for {
select {
case <-ctx.Done():
return ctx, func() context.Context { return originalCtx }, ctx.Err()
return func() {}, ctx.Err()
case <-ticker.C:
if l.tryLock(key) {
ctx, cancel := context.WithCancelCause(ctx)
releaseOnce := sync.Once{}
return ctx, func() context.Context {
return func() {
releaseOnce.Do(func() {
l.locks.Delete(key)
cancel(ErrLockReleased)
})
return originalCtx
}, nil
}
}
}
}

func (l *memoryLocker) TryLock(ctx context.Context, key string) (bool, context.Context, ReleaseFunc, error) {
originalCtx := ctx

func (l *memoryLocker) TryLock(_ context.Context, key string) (bool, ReleaseFunc, error) {
if l.tryLock(key) {
ctx, cancel := context.WithCancelCause(ctx)
releaseOnce := sync.Once{}
return true, ctx, func() context.Context {
return true, func() {
releaseOnce.Do(func() {
cancel(ErrLockReleased)
l.locks.Delete(key)
})
return originalCtx
}, nil
}

return false, ctx, func() context.Context { return originalCtx }, nil
return false, func() {}, nil
}

func (l *memoryLocker) tryLock(key string) bool {
Expand Down
Loading