Zookeeper分布式锁实现(zk怎么实现分布式锁)

如题,使用zookeeper实现分布式锁时隔多日又来水文章了,距离上一篇好像过去很久了,现在回头看看之前写的那些东西,只能称之为“垃圾”。今天分享一个基于zookeeper实现的分布式锁简单案例,此案例仅实现了分布式锁的功能,代码优化等一概不扯。下面先来聊聊其实现的核心思想:首先用到zookeeper中的两个重要知识点:1、zookeeper中的节点类型:临时节点、临时有序节点、持久节点、持久有序节点。临时节点跟session关联。2、zookeeper的watch。以上两点就是实现分布式锁的核

大家好,又见面了,我是你们的朋友全栈君。

如题,使用zookeeper实现分布式锁

时隔多日又来水文章了,距离上一篇好像过去很久了,现在回头看看之前写的那些东西,只能称之为“垃圾”。今天分享一个基于zookeeper实现的分布式锁简单案例,此案例仅实现了分布式锁的功能,代码优化等一概不扯。

下面先来聊聊其实现的核心思想:

首先用到zookeeper中的两个重要知识点:1、zookeeper中的节点类型:临时节点、临时有序节点、持久节点、持久有序节点。临时节点跟session关联。2、zookeeper的watch。以上两点就是实现分布式锁的核心点。

1、创建一个节点lock作为锁的根节点,当有线程需要抢锁的时候在该节点下创建一个临时有序节点

2、节点创建成功后,获取当前根节点下的所有孩子节点列表,并将自己阻塞住

3、因为获取到的子节点列表是无序的,所以需要先对子节点进行排序,然后判断自己是不是当前的第一个子节点,如果自己是第一个子节点说明抢到锁可以执行业务代码

4、如果自己不是第一个子节点,获取到自己当前在列表中索引,去监听自己的前一个节点,也就是自己的索引  index -1   (这里的监听前一个节点为核心,如果我们去监听根节点,那么一个节点的删除就需要回调所有的子节点代价太大,所以是监听前一个节点)

5、当获得锁的节点执行释放锁,也就是删除自己的节点时,后边监听的节点收到回调事件后再去获取所有的子节点,再去判断自己是不是第一个,执行抢锁操作

以上几步,便是实现分布式锁的核心思想。下面将实现的代码贴出来。

代码部分

1、pom.xml

 <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.7.0</version>
        </dependency>

2、ZKUtils.java  获取zookeeper实例的工具类

package com.bx.wx.system.zk;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

/**
 * @创建人 z.bx
 * @创建时间 2021/5/16
 */
public class ZKUtils {

    private static ZooKeeper zooKeeper;

    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    public static ZooKeeper getZooKeeper() throws Exception {
        ZooKeeper zooKeeper = new ZooKeeper("ip:2181,ip:2182,ip:2183/testConfig/lock", 3000, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                Event.KeeperState state = event.getState();
                switch (state) {
                    case Unknown:
                        break;
                    case Disconnected:
                        break;
                    case NoSyncConnected:
                        break;
                    case SyncConnected:
                        countDownLatch.countDown();
                        break;
                    case AuthFailed:
                        break;
                    case ConnectedReadOnly:
                        break;
                    case SaslAuthenticated:
                        break;
                    case Expired:
                        break;
                    case Closed:
                        break;
                }

            }
        });
        countDownLatch.await();
        return zooKeeper;
    }
}

3、ZKLockUtils.java  实现了分布式锁的工具类

package com.bx.wx.system.zk;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * @创建人 z.bx
 * @创建时间 2021/5/21
 */
public class ZKLockUtils implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback, AsyncCallback.Children2Callback, AsyncCallback.StringCallback {

    /**
     * 这里可通过set方法或者构造方法,传入zooKeeper,
     */
    private ZooKeeper zooKeeper;

    /**
     * 当前节点的path
     */
    private String pathName;

    /**
     * 当前线程的名字,便于查看
     */
    private String threadName;

    /**
     * 用于获取不到锁时候阻塞
     */
    private CountDownLatch countDownLatch = new CountDownLatch(1);

    /**
     * v 1.0
     * 加锁方法
     * 基础版本,功能实现了,后续再进行优化吧
     */
    public  void lock(){
        /**
         * 思路.....
         * 1、在锁目录下创建自己的节点,临时有序节点
         * 2、获取所有的孩子节点、判断自己是不是第一个
         * 3、如果自己是第一个,则加锁成功,执行业务代码
         * 4、如果自己不是第一个,watch自己的前一个节点
         * 5、当第一个节点,也就是获取锁的执行完之后,删除自己的节点
         * 6、第二个就能监听到,从而继续执行获取所有孩子节点,判断自己是不是第一个的操作
         */
        try {
            zooKeeper.create("/lock", "lock".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,this,threadName);
            //当前线程阻塞,进行抢锁
            countDownLatch.await();
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    /**
     * 解锁方法
     * 执行完业务后,删除掉自己的节点即可 version为-1  忽略数据版本
     */
    public  void ulock(){
        //删除自己的节点
        try {
            zooKeeper.delete(pathName,-1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }


    /**
     * Children2Callback 接口
     * 获取节点下所有孩子
     * 实现分布式锁的核心点
     * @param rc
     * @param path
     * @param ctx
     * @param children
     * @param stat
     */
    @Override
    public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
        if(children != null && children.size()>0){
            //对节点进行排序
            Collections.sort(children);
            String currentPath = pathName.substring(1);
            //查询自己是第几个
            int index = children.indexOf(currentPath);
            //判断自己是不是第一个
            if(index<1){
                try {
                    //如果自己是第一个,则认为抢到了锁
                    System.out.println(threadName+"抢到锁了..");
                    zooKeeper.setData("/",threadName.getBytes(),-1);
                    countDownLatch.countDown();
                }catch (Exception e){
                    e.printStackTrace();
                }
            }else{
                //只监听自己的前一个
                zooKeeper.exists("/"+children.get(index-1),this,this,"abc");
            }
        }
    }

    /**
     * 节点创建成功时的回调
     * @param rc
     * @param path
     * @param ctx
     * @param name
     */
    @Override
    public void processResult(int rc, String path, Object ctx, String name) {
        pathName = name;
        System.out.println(threadName+"-节点创建成功:"+pathName);
        //处的watch为false,表示不需要对根节点下的所有节点进行watch,我们只需要监听自己的前一个即可
        zooKeeper.getChildren("/",false,this,"abc");
    }

    /**
     * DataCallback接口
     * 当getdata有数据时的回调
     * @param rc
     * @param path
     * @param ctx
     * @param data
     * @param stat
     */
    @Override
    public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
        //TODO
    }

    /**
     * StatCallback接口
     * 判断节点是否存在时的回调
     * @param rc
     * @param path
     * @param ctx
     * @param stat
     */
    @Override
    public void processResult(int rc, String path, Object ctx, Stat stat) {
        //TODO
        /*if(stat != null){
            zooKeeper.getData("/lock",this,this,"abc");
        }*/
    }

    /**
     * Watcher 接口
     * 节点的事件回调
     * @param event
     */
    @Override
    public void process(WatchedEvent event) {
        Event.EventType eventType = event.getType();
        String path = event.getPath();
        switch (eventType) {
            case None:
                break;
            case NodeCreated:
                System.out.println("节点被创建...");
                break;
            case NodeDeleted:
                //当前一个节点被删除,判断自己是不是第一个
                System.out.println(path+"-节点被删除...");
                //执行获取所有孩子节点的操作
                zooKeeper.getChildren("/",false,this,"abc");
                break;
            case NodeDataChanged:
                break;
            case NodeChildrenChanged:
                break;
            case DataWatchRemoved:
                break;
            case ChildWatchRemoved:
                break;
            case PersistentWatchRemoved:
                break;
        }
    }

    public void setThreadName(String threadName) {
        this.threadName = threadName;
    }

    public void setZooKeeper(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }


}

4、ZkLockTest.java 测试类

package com.bx.wx.system.zk;

import org.apache.zookeeper.ZooKeeper;

/**
 * @创建人 z.bx
 * @创建时间 2021/5/21
 */
public class ZkLockTest {

    public static void main(String[] args)throws Exception {
        ZooKeeper zooKeeper = ZKUtils.getZooKeeper();
        //模拟多线程请求
        for (int i = 0; i < 5; i++) {
            String threadName = "LockThread-"+i;
            new Thread(()->{
                ZKLockUtils lockUtils = new ZKLockUtils();
                lockUtils.setZooKeeper(zooKeeper);
                lockUtils.setThreadName(threadName);
                //加锁
                lockUtils.lock();
                System.out.println(Thread.currentThread().getName()+"正在执行任务");
                //模拟执行任务
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //解锁
                lockUtils.ulock();
            },threadName).start();
        }
    }
}

 

 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/128665.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • 史上最全ASCII码对照表0-255(%d)

    史上最全ASCII码对照表0-255(%d)十进制代码 十六进制代码 MCS字符或缩写 DEC多国字符名 ASCII控制字符1 0 0 NUL 空字符 1 1 SOH 标…

  • python实现樱花[通俗易懂]

    python实现樱花[通俗易懂]python实现樱花代码如下:fromturtleimport*fromrandomimport*frommathimport*deftree(n,l):pd()#下笔#阴影效果t=cos(radians(heading()+45))/8+0.25pencolor(t,t,t)pensize(n/3)forward(l)#画树枝if

  • 坚果课堂回顾:团队项目管理&SOP打造顶尖执行力

    坚果课堂回顾:团队项目管理&SOP打造顶尖执行力【摘要】:已经非常努力高效的利用时间了,可为什么事情还是做不完?todolist产生的速度,超过能处理完成事情的速度? 作为知识密集型行业的代表人群,律师行业始终在探索如何高效利用时间,在有限的时间内创造出更多价值。在律师行业效率变革的驱动下,秉承效率至上的云盘服务提供商坚果云,一直致力于助力律师行业找到高效工作方式。在6月12日…

  • ZTE E700 自用感受及软件推荐

    ZTE E700 自用感受及软件推荐ZTEE700自用感受及软件推荐声明声明:此帖是转的  前两天看到移动08年新出的承诺话费换手机业务,换了个折扣最高的ZTEE700,首先声明我用的手机并不是很多,所以说错了希望大家见谅。这个机器我非常满意,非常好用!质量没得说,我从手里掉到水泥地上机器翻了3个跟头拿起来一点问题都没有。内置功能很强大,不过扩展功能则比较残废,好在机器本身的功能就能满足90%以上人的需求了,游戏…

  • msfconsole search_msfconsole下载

    msfconsole search_msfconsole下载msfconsole启动msf控制台后└─msfconsole2⨯…dBBBBBBbdBBBPdBBBBBBPdBBBBBb.o’dB’BBPdB’dB’dB’d…

  • RPC协议及常用框架

    RPC协议及常用框架https://www.jianshu.com/p/8ba4b7b834aaRPC协议RPC:远程过程调用,原则上来说系统间跨进程的调用都属于RPC范畴RMI/HTTP/dubbo/SpringCloud/thriftRPC框架如何实现分布式环境下的远程调用在一个典型的RPC的使用场景中,包含了服务发现,负载,容错,网络传输,序列化等组件,其中RPC协议指明了程序如何进行网络传输和序列化。RPC协议的组成RPC协议的组成1.地址:服务提供者地址2.端口:.

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号