redis分布式锁

参考项目

https://github.com/go-redsync/redsync

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package main

import (
	goredislib "github.com/go-redis/redis/v8"
	"github.com/go-redsync/redsync/v4"
	"github.com/go-redsync/redsync/v4/redis/goredis/v8"
)

func main() {
	// Create a pool with go-redis (or redigo) which is the pool redisync will
	// use while communicating with Redis. This can also be any pool that
	// implements the `redis.Pool` interface.
	client := goredislib.NewClient(&goredislib.Options{
		Addr: "localhost:6379",
	})
	pool := goredis.NewPool(client) // or, pool := redigo.NewPool(...)

	// Create an instance of redisync to be used to obtain a mutual exclusion
	// lock.
	rs := redsync.New(pool)

	// Obtain a new mutex by using the same name for all instances wanting the
	// same lock.
	mutexname := "my-global-mutex"
	mutex := rs.NewMutex(mutexname)

	// Obtain a lock for our given mutex. After this is successful, no one else
	// can obtain the same lock (the same mutex name) until we unlock it.
	if err := mutex.Lock(); err != nil {
		panic(err)
	}

	// Do your work that requires the lock.

	// Release the lock so other processes or threads can obtain a lock.
	if ok, err := mutex.Unlock(); !ok || err != nil {
		panic("unlock failed")
	}
}

对分布式锁的解决方案做的调研

分布式锁解决方案调研

一个分布式锁需要的基本功能如下:

\1. 互斥性: 只能有一个客户端获得锁

\2. 不会发生死锁: 客户端在持有锁的时候突然 crash掉,最终可以让其他客户端获得锁

\3. 容错性: Redis 集群中大部分的节点正常运行,客户端就可以进行加锁和解锁操作

Java 框架的解决方案:

基于zookeeper解决方法

采用临时有序节点 实现节点监听

基于Redis的解决方法

参考 redission框架

https://www.jianshu.com/p/0d3f5395ceaf

基本的功能:

\1. 可重入锁: 获得锁的线程不会锁住自己

\2. 超时时间: 获得锁的进程 如果宕机了, Redis 中存的锁如果不设置过期时间,就会发生死锁,

\3. 看门狗机制, 【如果方法一直在占用锁执行任务, 可以自动延续锁的占用时间】

\4. 红锁算法 【Redis集群下,采用多数原则,大多数节点上锁成功,才能任务是上锁成功】

参考:

https://www.cnblogs.com/jelly12345/p/14699492.html

参考gitee上已经开源的看门狗机制的实现

https://gitee.com/rmy_20/watch-dog-demo

基于golang的 红锁算法实现:

https://github.com/turbo-q/redlock/blob/master/redlock/redlock.go

上锁成功数 大于 一半才算上锁成功

如果需要 tryLock 做超时控制,可以 在此基础上做一个封装即可

golang分布式锁 超时控制的实现

https://github.com/bsm/redislock

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package main

//使用 GitHub开源的分布式锁库
import (
	"context"
	"errors"
	"github.com/bsm/redislock"
	"github.com/go-redis/redis/v8"
	"log"
	"time"
	//"log"
)

var (
	//获取锁时间超时
	WaitLockTimeout = errors.New("wait for lock timeout")
)

type WatchDogLock struct {
	WaitLockTime time.Duration   // 等待锁的时间,超时自动退出
	KeyTTL       time.Duration   // key的超时时间
	LockKey      string          //lock key
	realLock     *redislock.Lock //实现的类库
	//closeSignal  context.Context //  用来判断是否关闭锁了
	client *redis.Client
	// for cancel
	cancelWatchDog func()
	//ctx            context.Context
}

func NewWatchDogLock(waitLockTime time.Duration, keyTTL time.Duration, lockKey string, cli *redis.Client) *WatchDogLock {

	return &WatchDogLock{
		WaitLockTime: waitLockTime,
		KeyTTL:       keyTTL,
		LockKey:      lockKey,
		client:       cli,
	}
}
func (w *WatchDogLock) _close() error {
	//关闭看门狗
	if w.cancelWatchDog != nil {
		//log.Println("cancel")
		w.cancelWatchDog()
	}
	//删除锁
	if w.realLock != nil {
		//log.Println("release")
		err := w.realLock.Release(context.Background())
		return err
	}
	return nil
}
func (w *WatchDogLock) TryLock() (ok bool, err error) {
	locker := redislock.New(w.client)
	var LockWaitTimeout = time.After(w.WaitLockTime)
	//设置等待锁的时间
	//ctx := context.Background()
	var lock *redislock.Lock
tryLock:
	for {
		select {
		case <-LockWaitTimeout:
			log.Println("获取锁失败,已到超时时间")
			return false, WaitLockTimeout

		default:
			lock, err = locker.Obtain(context.Background(), w.LockKey, w.KeyTTL, nil)
			//无法获得锁
			if err == redislock.ErrNotObtained {
				time.Sleep(time.Millisecond * 600)

				//重试
				continue tryLock
			} else if err != nil {
				log.Println("lock unknown status %+v", err)
				return false, nil
			}
			//lock success
			break tryLock

		}

	}
	// 获得锁成功
	if lock == nil {
		log.Println("未知异常,获取锁失败 lock==nil")
		return false, errors.New("无法获得锁")
	}
	w.realLock = lock
	//自动对锁续期
	var ctx, cancel = context.WithCancel(context.Background())
	//w.ctx = ctx
	w.cancelWatchDog = cancel
	var watchDog = func() {
		for {
			select {
			case <-ctx.Done():
				log.Println("cancel watchdog")
				//被cancel掉了说明任务做完,立刻退出,不要继续加时间了
				return
			default:
				//时间不够了, 再加 4秒时间
				lock.Refresh(ctx, time.Second*30, nil)
				//刷新的话,最好睡一下
				select {
				case <-ctx.Done():
					log.Println("cancel watchdog 109")
					return
				case <-time.After(time.Second * 10):

				}
			}
		}
	}
	go watchDog()

	return true, nil
	//return
}
func (w *WatchDogLock) UnLock() error {

	return w._close()
}

分布式锁改进

下面是对分布式锁的改进

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package main

import (
	"context"
	"errors"
	"github.com/bsm/redislock"
	"github.com/go-redis/redis/v8"
	"time"
)

var (
	//获取锁时间超时
	WaitLockTimeout = errors.New("wait for lock timeout")
)

type waitLock struct {
	//退出协程
	unlockNotice chan byte
	//等待协程退出
	waitExit chan byte
	key      string

	cli  *redis.Client
	lock *redislock.Lock
}

func NewWaitLock(key string, cli *redis.Client) *waitLock {
	var s waitLock
	s.waitExit = make(chan byte, 1)
	s.unlockNotice = make(chan byte, 1)
	s.key = key
	//s.keyttl = time.Minute
	s.cli = cli

	return &s
}
func (w *waitLock) Unlock() error {
	if w.unlockNotice != nil {
		w.unlockNotice <- 1
	}
	<-w.waitExit
	return w.lock.Release(context.TODO())
}

func (w *waitLock) TryLock(pctx context.Context, waitTime time.Duration) error {
	var locker = redislock.New(w.cli)
	var ctx, cancel = context.WithTimeout(pctx, waitTime)
	defer cancel()
trylock:
	for {
		select {
		case <-ctx.Done():
			return WaitLockTimeout
		default:
			res, err := locker.Obtain(ctx, w.key, time.Minute, nil)
			if err == nil {
				w.lock = res
				break trylock
			} else {
				if err == redislock.ErrNotObtained {
					time.Sleep(time.Millisecond*500)
					continue trylock
				} else {

					return err
				}
			}

		}
	}
	// begin lock
	go func() {
		defer func() {
			w.waitExit <- 1
		}()
	addTime:
		for {
			select {
			case <-w.unlockNotice:
				break addTime
			default:
				w.lock.Refresh(context.TODO(), time.Second*30, nil)
				select {
				case <-time.After(time.Second * 10):

					continue addTime
				case <-w.unlockNotice:
					// 立刻返回
					break addTime

				}

			}
		}
	}()
	return nil

}