zookeeper 基础

[[zookeeper基础.md]]

你可以认为 ZooKeeper = 文件系统 + 监听通知机制

  • 持久化目录节点 PERSISTENT:客户端与zookeeper断开连接后,该节点依旧存在。
  • 持久化顺序编号目录节点 PERSISTENT_SEQUENTIAL:客户端与zookeeper断开连接后,该节点依旧存在,只是Zookeeper给该节点名称进行顺序编号。
  • 临时目录节点 EPHEMERAL:客户端与zookeeper断开连接后,该节点被删除。
  • 临时顺序编号目录节点 EPHEMERAL_SEQUENTIAL:客户端与zookeeper断开连接后,该节点被删除,只是Zookeeper给该节点名称进行顺序编号。

基于 zookeeper的瞬时节点 实现分布式锁

基本概念

瞬时节点 不能有子节点, 会话连接一旦断开,瞬时节点就会删除

持久节点 可以有子节点

安装和下载

  1. 去 zookeeper.apache.org 下载
  2. 解压文件 并且配置 zoo.cfg
  3. 运行 zookeeper , cli 客户端连接

zookeeper 观察器

检测 子节点的变化,删除, 如果发生变化, 立刻通知程序 节点发生变化。

  • 可设置的观察器的3个方法:
    • getData
    • getChildren
    • exists
  • 节点数据发生变化,发送给客户端
  • 观察器只能使用 一次, 再次监控 要重新设置。

实现原理

  • zookeeper 的瞬时有序节点特性
  • 多线程并发创建瞬时节点时,得到有序的序列
  • 序号最小的线程获得锁 【 相当于自己模拟了一个 AQS 队列
  • 其他检查监听自己序号的前一个序号
  • 创建节点的时候就确定了 获取锁的顺序
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package com.zyu.boot.demo.utils.lock.zookeeper;

import com.zyu.boot.demo.utils.lock.AbstractLockImpl;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * 基于互相监听机制的zk分布式锁
 */
public class DistributeLock2 extends AbstractLockImpl {

    private ZkClient zkClient = new ZkClient("172.17.0.2:2181");

    private final String PATH = "/path";

    private String currentPath;//当前节点
    private String beforePath;//当前节点的前一个节点

    private CountDownLatch countDownLatch = null;

    public DistributeLock2() {
        if (!zkClient.exists(PATH)) {
            zkClient.createPersistent(PATH);
        }
    }

    @Override
    public boolean tryLock() {
        //如果currentPath为空表示为第一次尝试加锁
        if(currentPath == null || currentPath.length() <= 0){
            //创建临时顺序节点
            currentPath = zkClient.createEphemeralSequential(PATH + "/", "lock");
        }
        //获取该路径下所有的子节点,并排序
        List<String> childrens = zkClient.getChildren(PATH);
        Collections.sort(childrens);
        //将最小的节点和自己做比较,如果相等,则认为拿到锁,直接返回
        if (currentPath.equals(PATH + "/" + childrens.get(0))) {
            return true;
        }else{
            //不等则拿锁失败,同时找到比自己大一点的节点
            int index = Collections.binarySearch(childrens,currentPath.replace(PATH + "/", ""));
            beforePath = PATH + "/" + childrens.get(index - 1);
        }
        return false;
    }

    @Override
    public void waitLock() {
        IZkDataListener iZkDataListener = new IZkDataListener() {
            @Override
            public void handleDataChange(String s, Object o) throws Exception {

            }

            @Override
            public void handleDataDeleted(String s) throws Exception {
                if (countDownLatch != null) {
                    // 这样就可以通知 那边 阻塞唤醒
                    countDownLatch.countDown();
                }
            }
        };
        //监听前面一个节点
        zkClient.subscribeDataChanges(beforePath, iZkDataListener);
        if (zkClient.exists(beforePath)) {
            countDownLatch = new CountDownLatch(1);
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        //取消节点的监听事件
        zkClient.unsubscribeDataChanges(beforePath, iZkDataListener);
    }

    @Override
    public void unlock() {
        //删除lock节点
        zkClient.delete(currentPath);
        zkClient.close();
    }
}