redis分布式锁
参考项目
https://github.com/go-redsync/redsync
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
package main
import (
goredislib "github.com/go-redis/redis/v8"
"github.com/go-redsync/redsync/v4"
"github.com/go-redsync/redsync/v4/redis/goredis/v8"
)
func main() {
// Create a pool with go-redis (or redigo) which is the pool redisync will
// use while communicating with Redis. This can also be any pool that
// implements the `redis.Pool` interface.
client := goredislib.NewClient(&goredislib.Options{
Addr: "localhost:6379",
})
pool := goredis.NewPool(client) // or, pool := redigo.NewPool(...)
// Create an instance of redisync to be used to obtain a mutual exclusion
// lock.
rs := redsync.New(pool)
// Obtain a new mutex by using the same name for all instances wanting the
// same lock.
mutexname := "my-global-mutex"
mutex := rs.NewMutex(mutexname)
// Obtain a lock for our given mutex. After this is successful, no one else
// can obtain the same lock (the same mutex name) until we unlock it.
if err := mutex.Lock(); err != nil {
panic(err)
}
// Do your work that requires the lock.
// Release the lock so other processes or threads can obtain a lock.
if ok, err := mutex.Unlock(); !ok || err != nil {
panic("unlock failed")
}
}
|
对分布式锁的解决方案做的调研
分布式锁解决方案调研
一个分布式锁需要的基本功能如下:
\1. 互斥性: 只能有一个客户端获得锁
\2. 不会发生死锁: 客户端在持有锁的时候突然 crash掉,最终可以让其他客户端获得锁
\3. 容错性: Redis 集群中大部分的节点正常运行,客户端就可以进行加锁和解锁操作
Java 框架的解决方案:
基于zookeeper解决方法
采用临时有序节点 实现节点监听
基于Redis的解决方法
参考 redission框架
https://www.jianshu.com/p/0d3f5395ceaf
基本的功能:
\1. 可重入锁: 获得锁的线程不会锁住自己
\2. 超时时间: 获得锁的进程 如果宕机了, Redis 中存的锁如果不设置过期时间,就会发生死锁,
\3. 看门狗机制, 【如果方法一直在占用锁执行任务, 可以自动延续锁的占用时间】
\4. 红锁算法 【Redis集群下,采用多数原则,大多数节点上锁成功,才能任务是上锁成功】
参考:
https://www.cnblogs.com/jelly12345/p/14699492.html
参考gitee上已经开源的看门狗机制的实现
https://gitee.com/rmy_20/watch-dog-demo
基于golang的 红锁算法实现:
https://github.com/turbo-q/redlock/blob/master/redlock/redlock.go
上锁成功数 大于 一半才算上锁成功
如果需要 tryLock 做超时控制,可以 在此基础上做一个封装即可
golang分布式锁 超时控制的实现
https://github.com/bsm/redislock
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
|
package main
//使用 GitHub开源的分布式锁库
import (
"context"
"errors"
"github.com/bsm/redislock"
"github.com/go-redis/redis/v8"
"log"
"time"
//"log"
)
var (
//获取锁时间超时
WaitLockTimeout = errors.New("wait for lock timeout")
)
type WatchDogLock struct {
WaitLockTime time.Duration // 等待锁的时间,超时自动退出
KeyTTL time.Duration // key的超时时间
LockKey string //lock key
realLock *redislock.Lock //实现的类库
//closeSignal context.Context // 用来判断是否关闭锁了
client *redis.Client
// for cancel
cancelWatchDog func()
//ctx context.Context
}
func NewWatchDogLock(waitLockTime time.Duration, keyTTL time.Duration, lockKey string, cli *redis.Client) *WatchDogLock {
return &WatchDogLock{
WaitLockTime: waitLockTime,
KeyTTL: keyTTL,
LockKey: lockKey,
client: cli,
}
}
func (w *WatchDogLock) _close() error {
//关闭看门狗
if w.cancelWatchDog != nil {
//log.Println("cancel")
w.cancelWatchDog()
}
//删除锁
if w.realLock != nil {
//log.Println("release")
err := w.realLock.Release(context.Background())
return err
}
return nil
}
func (w *WatchDogLock) TryLock() (ok bool, err error) {
locker := redislock.New(w.client)
var LockWaitTimeout = time.After(w.WaitLockTime)
//设置等待锁的时间
//ctx := context.Background()
var lock *redislock.Lock
tryLock:
for {
select {
case <-LockWaitTimeout:
log.Println("获取锁失败,已到超时时间")
return false, WaitLockTimeout
default:
lock, err = locker.Obtain(context.Background(), w.LockKey, w.KeyTTL, nil)
//无法获得锁
if err == redislock.ErrNotObtained {
time.Sleep(time.Millisecond * 600)
//重试
continue tryLock
} else if err != nil {
log.Println("lock unknown status %+v", err)
return false, nil
}
//lock success
break tryLock
}
}
// 获得锁成功
if lock == nil {
log.Println("未知异常,获取锁失败 lock==nil")
return false, errors.New("无法获得锁")
}
w.realLock = lock
//自动对锁续期
var ctx, cancel = context.WithCancel(context.Background())
//w.ctx = ctx
w.cancelWatchDog = cancel
var watchDog = func() {
for {
select {
case <-ctx.Done():
log.Println("cancel watchdog")
//被cancel掉了说明任务做完,立刻退出,不要继续加时间了
return
default:
//时间不够了, 再加 4秒时间
lock.Refresh(ctx, time.Second*30, nil)
//刷新的话,最好睡一下
select {
case <-ctx.Done():
log.Println("cancel watchdog 109")
return
case <-time.After(time.Second * 10):
}
}
}
}
go watchDog()
return true, nil
//return
}
func (w *WatchDogLock) UnLock() error {
return w._close()
}
|
分布式锁改进
下面是对分布式锁的改进
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
|
package main
import (
"context"
"errors"
"github.com/bsm/redislock"
"github.com/go-redis/redis/v8"
"time"
)
var (
//获取锁时间超时
WaitLockTimeout = errors.New("wait for lock timeout")
)
type waitLock struct {
//退出协程
unlockNotice chan byte
//等待协程退出
waitExit chan byte
key string
cli *redis.Client
lock *redislock.Lock
}
func NewWaitLock(key string, cli *redis.Client) *waitLock {
var s waitLock
s.waitExit = make(chan byte, 1)
s.unlockNotice = make(chan byte, 1)
s.key = key
//s.keyttl = time.Minute
s.cli = cli
return &s
}
func (w *waitLock) Unlock() error {
if w.unlockNotice != nil {
w.unlockNotice <- 1
}
<-w.waitExit
return w.lock.Release(context.TODO())
}
func (w *waitLock) TryLock(pctx context.Context, waitTime time.Duration) error {
var locker = redislock.New(w.cli)
var ctx, cancel = context.WithTimeout(pctx, waitTime)
defer cancel()
trylock:
for {
select {
case <-ctx.Done():
return WaitLockTimeout
default:
res, err := locker.Obtain(ctx, w.key, time.Minute, nil)
if err == nil {
w.lock = res
break trylock
} else {
if err == redislock.ErrNotObtained {
time.Sleep(time.Millisecond*500)
continue trylock
} else {
return err
}
}
}
}
// begin lock
go func() {
defer func() {
w.waitExit <- 1
}()
addTime:
for {
select {
case <-w.unlockNotice:
break addTime
default:
w.lock.Refresh(context.TODO(), time.Second*30, nil)
select {
case <-time.After(time.Second * 10):
continue addTime
case <-w.unlockNotice:
// 立刻返回
break addTime
}
}
}
}()
return nil
}
|