4
4
"context"
5
5
"fmt"
6
6
"os"
7
+ "strings"
7
8
"sync"
8
9
"time"
9
10
@@ -62,8 +63,8 @@ type IRedisLocker interface {
62
63
//
63
64
// # Note
64
65
//
65
- // 该方法会阻塞住线程直到解锁成功 或者 触发ctx.Done()
66
- UnlockWithContext (ctx context.Context , key string )
66
+ // 该方法会阻塞住线程直到解锁有结果 或者 触发ctx.Done()
67
+ UnlockWithContext (ctx context.Context , key string ) bool
67
68
}
68
69
69
70
var _ IRedisLocker = & redisLockerImpl {}
@@ -104,7 +105,7 @@ type cancelControl struct {
104
105
}
105
106
106
107
type stateListeners struct {
107
- mux * sync.Mutex
108
+ mux * sync.RWMutex
108
109
listeners map [string ]* cancelControl
109
110
}
110
111
@@ -113,7 +114,7 @@ var (
113
114
redisExecuteTimeout = time .Second * 3
114
115
retryInterval = time .Millisecond * 100
115
116
renewalCheckInterval = time .Second * 1
116
- states = & stateListeners {mux : & sync.Mutex {}, listeners : make (map [string ]* cancelControl )}
117
+ states = & stateListeners {mux : & sync.RWMutex {}, listeners : make (map [string ]* cancelControl )}
117
118
)
118
119
119
120
// TryLock redis锁-尝试上锁
@@ -142,6 +143,10 @@ func (rl *redisLockerImpl) TryLock(key string) bool {
142
143
//
143
144
// 该方法会立即返回锁定成功与否的结果
144
145
func (rl * redisLockerImpl ) TryLockWithContext (ctx context.Context , key string ) bool {
146
+ if ! canDoLockPreflight (key ) {
147
+ return false
148
+ }
149
+
145
150
lockOk , lockErr := rl .client .SetNX (ctx , key , lockerValue (), lockTTL ).Result ()
146
151
if lockErr != nil {
147
152
fmt .Printf ("[Go-Sail] <redisLock> key: %s lock err: %v\n " , key , lockErr )
@@ -174,6 +179,7 @@ func (rl *redisLockerImpl) Lock(ctx context.Context, key string) {
174
179
//第一次锁定失败,进行重试操作
175
180
if ! lockOk || lockErr != nil {
176
181
retryTicker := time .NewTicker (retryInterval )
182
+ defer retryTicker .Stop ()
177
183
178
184
LOOP:
179
185
for {
@@ -202,9 +208,19 @@ func (rl *redisLockerImpl) Unlock(key string) bool {
202
208
ctx , cancel := withRedisExecuteTimeout ()
203
209
defer cancel ()
204
210
211
+ //持有者一致性检测(如果获取失败,也认为不符合一致性)
212
+ lv , err := rl .client .Get (ctx , key ).Result ()
213
+ if err != nil {
214
+ fmt .Printf ("[Go-Sail] <redisLock> key: %s unlock get key err: %v\n " , key , err )
215
+ return false
216
+ }
217
+ if ! holderConsistencyDetection (lv ) {
218
+ return false
219
+ }
220
+
205
221
unlockOk , unlockErr := rl .client .Del (ctx , key ).Result ()
206
222
if unlockErr != nil {
207
- fmt .Printf ("[Go-Sail] <redisLock> key: %s unlock error: %v\n " , key , unlockErr )
223
+ fmt .Printf ("[Go-Sail] <redisLock> key: %s unlock delete key error: %v\n " , key , unlockErr )
208
224
}
209
225
210
226
//清理内存数据并终止自动续期
@@ -218,11 +234,21 @@ func (rl *redisLockerImpl) Unlock(key string) bool {
218
234
// UnlockWithContext redis锁-解锁
219
235
//
220
236
// using Del
221
- func (rl * redisLockerImpl ) UnlockWithContext (ctx context.Context , key string ) {
237
+ func (rl * redisLockerImpl ) UnlockWithContext (ctx context.Context , key string ) bool {
238
+ //持有者一致性检测(如果获取失败,也认为不符合一致性)
239
+ lv , err := rl .client .Get (ctx , key ).Result ()
240
+ if err != nil {
241
+ fmt .Printf ("[Go-Sail] <redisLock> key: %s unlock with context get key err: %v\n " , key , err )
242
+ return false
243
+ }
244
+ if ! holderConsistencyDetection (lv ) {
245
+ return false
246
+ }
222
247
unlockOk , unlockErr := rl .client .Del (ctx , key ).Result ()
223
248
224
249
if unlockOk != 1 || unlockErr != nil {
225
250
ticker := time .NewTicker (retryInterval )
251
+ defer ticker .Stop ()
226
252
227
253
LOOP:
228
254
for {
@@ -240,6 +266,8 @@ func (rl *redisLockerImpl) UnlockWithContext(ctx context.Context, key string) {
240
266
241
267
//清理内存数据并终止自动续期
242
268
rl .clearListenerAndStopAutoRenewal (key )
269
+
270
+ return unlockOk == 1
243
271
}
244
272
245
273
// 自动续期
@@ -274,26 +302,35 @@ type keyAndCtrl struct {
274
302
// 2.使用redis pipeline减少RTT
275
303
func (rl * redisLockerImpl ) startRenewalScheduler () {
276
304
doRenewalRound := func () {
305
+ if rl .client == nil {
306
+ fmt .Println ("[Go-Sail] <redisLock> renewal task not emit cause redis client is nil" )
307
+ return
308
+ }
309
+
277
310
states .mux .Lock ()
278
311
if len (states .listeners ) == 0 {
279
312
states .mux .Unlock ()
280
313
//避免空转锁定占用
281
314
return
282
315
}
283
- keys := make ([]* keyAndCtrl , 0 , len (states .listeners ))
316
+ processingKeys := make ([]* keyAndCtrl , 0 , len (states .listeners ))
284
317
for key , ctrl := range states .listeners {
285
- keys = append (keys , & keyAndCtrl {key : key , ctrl : ctrl })
318
+ processingKeys = append (processingKeys , & keyAndCtrl {key : key , ctrl : ctrl })
286
319
}
287
320
states .mux .Unlock ()
288
321
289
322
ctx , cancel := withRedisExecuteTimeout ()
290
323
defer cancel ()
324
+
325
+ validKeys := make ([]* keyAndCtrl , 0 , len (processingKeys ))
326
+ invalidKeys := make ([]* keyAndCtrl , 0 , len (processingKeys ))
291
327
cmds , pipeErr := rl .client .Pipelined (ctx , func (pipe redisLib.Pipeliner ) error {
292
- for index := range keys {
293
- if keys [index ].ctrl .ctx .Err () != nil {
294
- delete ( states . listeners , keys [index ]. key )
328
+ for index := range processingKeys {
329
+ if processingKeys [index ].ctrl .ctx .Err () != nil {
330
+ invalidKeys = append ( invalidKeys , processingKeys [index ])
295
331
} else {
296
- pipe .ExpireXX (ctx , keys [index ].key , lockTTL )
332
+ pipe .Expire (ctx , processingKeys [index ].key , lockTTL )
333
+ validKeys = append (validKeys , processingKeys [index ])
297
334
}
298
335
}
299
336
return nil
@@ -308,12 +345,17 @@ func (rl *redisLockerImpl) startRenewalScheduler() {
308
345
for index := range cmds {
309
346
if expOk , expErr := cmds [index ].(* redisLib.BoolCmd ).Result (); ! expOk || expErr != nil {
310
347
if expErr != nil {
311
- fmt .Printf ("[Go-Sail] <redisLock> key: %s renewal err: %v\n " , keys [index ].key , expErr )
348
+ fmt .Printf ("[Go-Sail] <redisLock> key: %s renewal err: %v\n " , validKeys [index ].key , expErr )
312
349
}
313
- keys [index ].ctrl .cancel () //续期失败也要清理掉
314
- delete (states .listeners , keys [index ].key )
350
+ validKeys [index ].ctrl .cancel () //续期失败也要清理掉
351
+ delete (states .listeners , validKeys [index ].key )
315
352
}
316
353
}
354
+ //清理已经过期的
355
+ for index := range invalidKeys {
356
+ invalidKeys [index ].ctrl .cancel () //保险的再次调用以确保触发ctx.Done
357
+ delete (states .listeners , invalidKeys [index ].key )
358
+ }
317
359
states .mux .Unlock ()
318
360
}
319
361
@@ -327,6 +369,16 @@ func (rl *redisLockerImpl) startRenewalScheduler() {
327
369
}()
328
370
}
329
371
372
+ // 预检是否可以执行锁定任务
373
+ //
374
+ // 此操作属于本地(堆栈)检测
375
+ func canDoLockPreflight (key string ) bool {
376
+ states .mux .RLock ()
377
+ defer states .mux .RUnlock ()
378
+ _ , exist := states .listeners [key ]
379
+ return ! exist
380
+ }
381
+
330
382
// redis操作超时控制
331
383
func withRedisExecuteTimeout () (context.Context , context.CancelFunc ) {
332
384
return context .WithTimeout (context .Background (), redisExecuteTimeout )
@@ -335,9 +387,26 @@ func withRedisExecuteTimeout() (context.Context, context.CancelFunc) {
335
387
var (
336
388
hostname , _ = os .Hostname () //主机名称
337
389
ip , _ = IP ().GetLocal () //主机ip
390
+ processId = os .Getpid () //进程id
338
391
)
339
392
340
393
// 锁的持有者信息
341
394
func lockerValue () string {
342
- return fmt .Sprintf ("lockedAt:%s@%s(%s)" , hostname , ip , time .Now ().Format ("2006-01-02T15:04:05.000000Z" ))
395
+ return fmt .Sprintf ("lockedAt:%s@%s<%d>(%s)" ,
396
+ hostname , ip , processId , time .Now ().Format ("2006-01-02T15:04:05.000000Z" ))
397
+ }
398
+
399
+ // 持有者一致性检测
400
+ //
401
+ // # 注意:
402
+ //
403
+ // 一致性检测以【机器主机名+ip+进程id】为判断依据,
404
+ //
405
+ // 这样设计为的是锁只能被【持有者自己】释放,若进程
406
+ //
407
+ // down掉,堆栈中的自动维护信息会被释放,
408
+ //
409
+ // 因此即便是重新启动获取到了相同的进程号,也不受影响。
410
+ func holderConsistencyDetection (lockerValue string ) bool {
411
+ return strings .HasPrefix (lockerValue , fmt .Sprintf ("lockedAt:%s@%s<%d>(" , hostname , ip , processId ))
343
412
}
0 commit comments