Zookeeper分布式锁实现(zk怎么实现分布式锁)

如题,使用zookeeper实现分布式锁时隔多日又来水文章了,距离上一篇好像过去很久了,现在回头看看之前写的那些东西,只能称之为“垃圾”。今天分享一个基于zookeeper实现的分布式锁简单案例,此案例仅实现了分布式锁的功能,代码优化等一概不扯。下面先来聊聊其实现的核心思想:首先用到zookeeper中的两个重要知识点:1、zookeeper中的节点类型:临时节点、临时有序节点、持久节点、持久有序节点。临时节点跟session关联。2、zookeeper的watch。以上两点就是实现分布式锁的核

大家好,又见面了,我是你们的朋友全栈君。

如题,使用zookeeper实现分布式锁

时隔多日又来水文章了,距离上一篇好像过去很久了,现在回头看看之前写的那些东西,只能称之为“垃圾”。今天分享一个基于zookeeper实现的分布式锁简单案例,此案例仅实现了分布式锁的功能,代码优化等一概不扯。

下面先来聊聊其实现的核心思想:

首先用到zookeeper中的两个重要知识点:1、zookeeper中的节点类型:临时节点、临时有序节点、持久节点、持久有序节点。临时节点跟session关联。2、zookeeper的watch。以上两点就是实现分布式锁的核心点。

1、创建一个节点lock作为锁的根节点,当有线程需要抢锁的时候在该节点下创建一个临时有序节点

2、节点创建成功后,获取当前根节点下的所有孩子节点列表,并将自己阻塞住

3、因为获取到的子节点列表是无序的,所以需要先对子节点进行排序,然后判断自己是不是当前的第一个子节点,如果自己是第一个子节点说明抢到锁可以执行业务代码

4、如果自己不是第一个子节点,获取到自己当前在列表中索引,去监听自己的前一个节点,也就是自己的索引  index -1   (这里的监听前一个节点为核心,如果我们去监听根节点,那么一个节点的删除就需要回调所有的子节点代价太大,所以是监听前一个节点)

5、当获得锁的节点执行释放锁,也就是删除自己的节点时,后边监听的节点收到回调事件后再去获取所有的子节点,再去判断自己是不是第一个,执行抢锁操作

以上几步,便是实现分布式锁的核心思想。下面将实现的代码贴出来。

代码部分

1、pom.xml

 <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.7.0</version>
        </dependency>

2、ZKUtils.java  获取zookeeper实例的工具类

package com.bx.wx.system.zk;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

/**
 * @创建人 z.bx
 * @创建时间 2021/5/16
 */
public class ZKUtils {

    private static ZooKeeper zooKeeper;

    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    public static ZooKeeper getZooKeeper() throws Exception {
        ZooKeeper zooKeeper = new ZooKeeper("ip:2181,ip:2182,ip:2183/testConfig/lock", 3000, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                Event.KeeperState state = event.getState();
                switch (state) {
                    case Unknown:
                        break;
                    case Disconnected:
                        break;
                    case NoSyncConnected:
                        break;
                    case SyncConnected:
                        countDownLatch.countDown();
                        break;
                    case AuthFailed:
                        break;
                    case ConnectedReadOnly:
                        break;
                    case SaslAuthenticated:
                        break;
                    case Expired:
                        break;
                    case Closed:
                        break;
                }

            }
        });
        countDownLatch.await();
        return zooKeeper;
    }
}

3、ZKLockUtils.java  实现了分布式锁的工具类

package com.bx.wx.system.zk;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * @创建人 z.bx
 * @创建时间 2021/5/21
 */
public class ZKLockUtils implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback, AsyncCallback.Children2Callback, AsyncCallback.StringCallback {

    /**
     * 这里可通过set方法或者构造方法,传入zooKeeper,
     */
    private ZooKeeper zooKeeper;

    /**
     * 当前节点的path
     */
    private String pathName;

    /**
     * 当前线程的名字,便于查看
     */
    private String threadName;

    /**
     * 用于获取不到锁时候阻塞
     */
    private CountDownLatch countDownLatch = new CountDownLatch(1);

    /**
     * v 1.0
     * 加锁方法
     * 基础版本,功能实现了,后续再进行优化吧
     */
    public  void lock(){
        /**
         * 思路.....
         * 1、在锁目录下创建自己的节点,临时有序节点
         * 2、获取所有的孩子节点、判断自己是不是第一个
         * 3、如果自己是第一个,则加锁成功,执行业务代码
         * 4、如果自己不是第一个,watch自己的前一个节点
         * 5、当第一个节点,也就是获取锁的执行完之后,删除自己的节点
         * 6、第二个就能监听到,从而继续执行获取所有孩子节点,判断自己是不是第一个的操作
         */
        try {
            zooKeeper.create("/lock", "lock".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,this,threadName);
            //当前线程阻塞,进行抢锁
            countDownLatch.await();
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    /**
     * 解锁方法
     * 执行完业务后,删除掉自己的节点即可 version为-1  忽略数据版本
     */
    public  void ulock(){
        //删除自己的节点
        try {
            zooKeeper.delete(pathName,-1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }


    /**
     * Children2Callback 接口
     * 获取节点下所有孩子
     * 实现分布式锁的核心点
     * @param rc
     * @param path
     * @param ctx
     * @param children
     * @param stat
     */
    @Override
    public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
        if(children != null && children.size()>0){
            //对节点进行排序
            Collections.sort(children);
            String currentPath = pathName.substring(1);
            //查询自己是第几个
            int index = children.indexOf(currentPath);
            //判断自己是不是第一个
            if(index<1){
                try {
                    //如果自己是第一个,则认为抢到了锁
                    System.out.println(threadName+"抢到锁了..");
                    zooKeeper.setData("/",threadName.getBytes(),-1);
                    countDownLatch.countDown();
                }catch (Exception e){
                    e.printStackTrace();
                }
            }else{
                //只监听自己的前一个
                zooKeeper.exists("/"+children.get(index-1),this,this,"abc");
            }
        }
    }

    /**
     * 节点创建成功时的回调
     * @param rc
     * @param path
     * @param ctx
     * @param name
     */
    @Override
    public void processResult(int rc, String path, Object ctx, String name) {
        pathName = name;
        System.out.println(threadName+"-节点创建成功:"+pathName);
        //处的watch为false,表示不需要对根节点下的所有节点进行watch,我们只需要监听自己的前一个即可
        zooKeeper.getChildren("/",false,this,"abc");
    }

    /**
     * DataCallback接口
     * 当getdata有数据时的回调
     * @param rc
     * @param path
     * @param ctx
     * @param data
     * @param stat
     */
    @Override
    public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
        //TODO
    }

    /**
     * StatCallback接口
     * 判断节点是否存在时的回调
     * @param rc
     * @param path
     * @param ctx
     * @param stat
     */
    @Override
    public void processResult(int rc, String path, Object ctx, Stat stat) {
        //TODO
        /*if(stat != null){
            zooKeeper.getData("/lock",this,this,"abc");
        }*/
    }

    /**
     * Watcher 接口
     * 节点的事件回调
     * @param event
     */
    @Override
    public void process(WatchedEvent event) {
        Event.EventType eventType = event.getType();
        String path = event.getPath();
        switch (eventType) {
            case None:
                break;
            case NodeCreated:
                System.out.println("节点被创建...");
                break;
            case NodeDeleted:
                //当前一个节点被删除,判断自己是不是第一个
                System.out.println(path+"-节点被删除...");
                //执行获取所有孩子节点的操作
                zooKeeper.getChildren("/",false,this,"abc");
                break;
            case NodeDataChanged:
                break;
            case NodeChildrenChanged:
                break;
            case DataWatchRemoved:
                break;
            case ChildWatchRemoved:
                break;
            case PersistentWatchRemoved:
                break;
        }
    }

    public void setThreadName(String threadName) {
        this.threadName = threadName;
    }

    public void setZooKeeper(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }


}

4、ZkLockTest.java 测试类

package com.bx.wx.system.zk;

import org.apache.zookeeper.ZooKeeper;

/**
 * @创建人 z.bx
 * @创建时间 2021/5/21
 */
public class ZkLockTest {

    public static void main(String[] args)throws Exception {
        ZooKeeper zooKeeper = ZKUtils.getZooKeeper();
        //模拟多线程请求
        for (int i = 0; i < 5; i++) {
            String threadName = "LockThread-"+i;
            new Thread(()->{
                ZKLockUtils lockUtils = new ZKLockUtils();
                lockUtils.setZooKeeper(zooKeeper);
                lockUtils.setThreadName(threadName);
                //加锁
                lockUtils.lock();
                System.out.println(Thread.currentThread().getName()+"正在执行任务");
                //模拟执行任务
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //解锁
                lockUtils.ulock();
            },threadName).start();
        }
    }
}

 

 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/128665.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • 【QGIS入门实战精品教程】2.1:初识QGIS软件[通俗易懂]

    【QGIS入门实战精品教程】2.1:初识QGIS软件[通俗易懂]从今天开始,我们一起来学习一款免费开源、对机器要求低、功能强大的GIS软件:QGIS!一、QGIS简介QGIS(原称QuantumGIS)是一个自由软件的桌面GIS软件。它提供数据的显示、编辑和分析功能。QGIS是一个用户界面友好的桌面地理信息系统,可运行在Linux、Unix、MacOSX和Windows等平台之上。QGIS是基于Qt,使用C++开发的一个用户界面友好、跨平台的免费开源版桌面地理信息系统。二、QGIS软件的主要特点支持多种GIS数据文件格式。通过GDAL..

  • 常量表达式是什么_const常量

    常量表达式是什么_const常量常量表达式值(constant-expressionvalue)。通常情况下,常量表达式值必须被一个常量表达式赋值,而跟常量表达式函数一样,常量表达式值在使用前必须被初始化。一、常量表达式1.1运行时常量性与编译时常量性在C++中,我们常常会遇到常量的概念。常量表示该值不可修改,通常是通过const关键字来修饰的。比如:constinti=3;const还可以修饰函数参数、函数返回值、函数本身、类等。在不同的使用条件下,const有不同的意义,不过大多数情况下,const描述的都

  • yum卸载软件

    yum卸载软件yum可以安装软件,也可以卸载软件yum安装软件的命令为:yuminstallfileNameyum卸载软件的命令:sudoyumremovedocker\docker-client\docker-client-latest\docker-common…

  • 笔记29-MySQL多表&事务

    笔记29-MySQL多表&事务今日内容1.多表查询2.事务3.DCL多表查询:*查询语法: select 列名列表 from 表名列表 where….*准备sql #创建部门表 CREATETABLEdept( idINTPRIMARYKEYAUTO_INCREMENT, NAMEVARCHAR(20) ); INSERTINTOdept(NAME)VALUES(‘开发部’),(‘市场部’),(‘财务部’); #创建员工表 CREATETAB

  • PS图层混合模式实例详解

    PS图层混合模式实例详解PS中的很多概念都和CoreGraphics中的概念相通,比如蒙版、路径、裁剪、混合模式等等。如果你对CoreGraphics中的混合模式不太理解,阅读本篇文章能让你对CoreGraphics中

  • 软件工程导论(第六版) 思维导图全[通俗易懂]

    软件工程导论(第六版) 思维导图全[通俗易懂]软件工程导论(第六版)思维导图全软件工程软件工程概述可行性研究需求分析总体设计详细设计实现维护面…

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号