zookeeper 基础
[[zookeeper基础.md]]
你可以认为 ZooKeeper = 文件系统 + 监听通知机制
- 持久化目录节点 PERSISTENT:客户端与zookeeper断开连接后,该节点依旧存在。
- 持久化顺序编号目录节点 PERSISTENT_SEQUENTIAL:客户端与zookeeper断开连接后,该节点依旧存在,只是Zookeeper给该节点名称进行顺序编号。
- 临时目录节点 EPHEMERAL:客户端与zookeeper断开连接后,该节点被删除。
- 临时顺序编号目录节点 EPHEMERAL_SEQUENTIAL:客户端与zookeeper断开连接后,该节点被删除,只是Zookeeper给该节点名称进行顺序编号。
基于 zookeeper的瞬时节点 实现分布式锁
基本概念
瞬时节点 不能有子节点, 会话连接一旦断开,瞬时节点就会删除
持久节点 可以有子节点
安装和下载
- 去 zookeeper.apache.org 下载
- 解压文件 并且配置 zoo.cfg
- 运行 zookeeper , cli 客户端连接
zookeeper 观察器
检测 子节点的变化,删除, 如果发生变化, 立刻通知程序 节点发生变化。
- 可设置的观察器的3个方法:
- getData
- getChildren
- exists
- 节点数据发生变化,发送给客户端
- 观察器只能使用 一次, 再次监控 要重新设置。
实现原理
- zookeeper 的瞬时有序节点特性
- 多线程并发创建瞬时节点时,得到有序的序列
- 序号最小的线程获得锁 【 相当于自己模拟了一个 AQS 队列】
- 其他检查监听自己序号的前一个序号
- 创建节点的时候就确定了 获取锁的顺序
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
|
package com.zyu.boot.demo.utils.lock.zookeeper;
import com.zyu.boot.demo.utils.lock.AbstractLockImpl;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* 基于互相监听机制的zk分布式锁
*/
public class DistributeLock2 extends AbstractLockImpl {
private ZkClient zkClient = new ZkClient("172.17.0.2:2181");
private final String PATH = "/path";
private String currentPath;//当前节点
private String beforePath;//当前节点的前一个节点
private CountDownLatch countDownLatch = null;
public DistributeLock2() {
if (!zkClient.exists(PATH)) {
zkClient.createPersistent(PATH);
}
}
@Override
public boolean tryLock() {
//如果currentPath为空表示为第一次尝试加锁
if(currentPath == null || currentPath.length() <= 0){
//创建临时顺序节点
currentPath = zkClient.createEphemeralSequential(PATH + "/", "lock");
}
//获取该路径下所有的子节点,并排序
List<String> childrens = zkClient.getChildren(PATH);
Collections.sort(childrens);
//将最小的节点和自己做比较,如果相等,则认为拿到锁,直接返回
if (currentPath.equals(PATH + "/" + childrens.get(0))) {
return true;
}else{
//不等则拿锁失败,同时找到比自己大一点的节点
int index = Collections.binarySearch(childrens,currentPath.replace(PATH + "/", ""));
beforePath = PATH + "/" + childrens.get(index - 1);
}
return false;
}
@Override
public void waitLock() {
IZkDataListener iZkDataListener = new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) throws Exception {
}
@Override
public void handleDataDeleted(String s) throws Exception {
if (countDownLatch != null) {
// 这样就可以通知 那边 阻塞唤醒
countDownLatch.countDown();
}
}
};
//监听前面一个节点
zkClient.subscribeDataChanges(beforePath, iZkDataListener);
if (zkClient.exists(beforePath)) {
countDownLatch = new CountDownLatch(1);
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//取消节点的监听事件
zkClient.unsubscribeDataChanges(beforePath, iZkDataListener);
}
@Override
public void unlock() {
//删除lock节点
zkClient.delete(currentPath);
zkClient.close();
}
}
|