大家好,又见面了,我是你们的朋友全栈君。
如题,使用zookeeper实现分布式锁
时隔多日又来水文章了,距离上一篇好像过去很久了,现在回头看看之前写的那些东西,只能称之为“垃圾”。今天分享一个基于zookeeper实现的分布式锁简单案例,此案例仅实现了分布式锁的功能,代码优化等一概不扯。
下面先来聊聊其实现的核心思想:
首先用到zookeeper中的两个重要知识点:1、zookeeper中的节点类型:临时节点、临时有序节点、持久节点、持久有序节点。临时节点跟session关联。2、zookeeper的watch。以上两点就是实现分布式锁的核心点。
1、创建一个节点lock作为锁的根节点,当有线程需要抢锁的时候在该节点下创建一个临时有序节点
2、节点创建成功后,获取当前根节点下的所有孩子节点列表,并将自己阻塞住
3、因为获取到的子节点列表是无序的,所以需要先对子节点进行排序,然后判断自己是不是当前的第一个子节点,如果自己是第一个子节点说明抢到锁可以执行业务代码
4、如果自己不是第一个子节点,获取到自己当前在列表中索引,去监听自己的前一个节点,也就是自己的索引 index -1 (这里的监听前一个节点为核心,如果我们去监听根节点,那么一个节点的删除就需要回调所有的子节点代价太大,所以是监听前一个节点)
5、当获得锁的节点执行释放锁,也就是删除自己的节点时,后边监听的节点收到回调事件后再去获取所有的子节点,再去判断自己是不是第一个,执行抢锁操作
以上几步,便是实现分布式锁的核心思想。下面将实现的代码贴出来。
代码部分
1、pom.xml
<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.7.0</version>
</dependency>
2、ZKUtils.java 获取zookeeper实例的工具类
package com.bx.wx.system.zk;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
/**
* @创建人 z.bx
* @创建时间 2021/5/16
*/
public class ZKUtils {
private static ZooKeeper zooKeeper;
private static CountDownLatch countDownLatch = new CountDownLatch(1);
public static ZooKeeper getZooKeeper() throws Exception {
ZooKeeper zooKeeper = new ZooKeeper("ip:2181,ip:2182,ip:2183/testConfig/lock", 3000, new Watcher() {
@Override
public void process(WatchedEvent event) {
Event.KeeperState state = event.getState();
switch (state) {
case Unknown:
break;
case Disconnected:
break;
case NoSyncConnected:
break;
case SyncConnected:
countDownLatch.countDown();
break;
case AuthFailed:
break;
case ConnectedReadOnly:
break;
case SaslAuthenticated:
break;
case Expired:
break;
case Closed:
break;
}
}
});
countDownLatch.await();
return zooKeeper;
}
}
3、ZKLockUtils.java 实现了分布式锁的工具类
package com.bx.wx.system.zk;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* @创建人 z.bx
* @创建时间 2021/5/21
*/
public class ZKLockUtils implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback, AsyncCallback.Children2Callback, AsyncCallback.StringCallback {
/**
* 这里可通过set方法或者构造方法,传入zooKeeper,
*/
private ZooKeeper zooKeeper;
/**
* 当前节点的path
*/
private String pathName;
/**
* 当前线程的名字,便于查看
*/
private String threadName;
/**
* 用于获取不到锁时候阻塞
*/
private CountDownLatch countDownLatch = new CountDownLatch(1);
/**
* v 1.0
* 加锁方法
* 基础版本,功能实现了,后续再进行优化吧
*/
public void lock(){
/**
* 思路.....
* 1、在锁目录下创建自己的节点,临时有序节点
* 2、获取所有的孩子节点、判断自己是不是第一个
* 3、如果自己是第一个,则加锁成功,执行业务代码
* 4、如果自己不是第一个,watch自己的前一个节点
* 5、当第一个节点,也就是获取锁的执行完之后,删除自己的节点
* 6、第二个就能监听到,从而继续执行获取所有孩子节点,判断自己是不是第一个的操作
*/
try {
zooKeeper.create("/lock", "lock".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,this,threadName);
//当前线程阻塞,进行抢锁
countDownLatch.await();
}catch (Exception e){
e.printStackTrace();
}
}
/**
* 解锁方法
* 执行完业务后,删除掉自己的节点即可 version为-1 忽略数据版本
*/
public void ulock(){
//删除自己的节点
try {
zooKeeper.delete(pathName,-1);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
/**
* Children2Callback 接口
* 获取节点下所有孩子
* 实现分布式锁的核心点
* @param rc
* @param path
* @param ctx
* @param children
* @param stat
*/
@Override
public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
if(children != null && children.size()>0){
//对节点进行排序
Collections.sort(children);
String currentPath = pathName.substring(1);
//查询自己是第几个
int index = children.indexOf(currentPath);
//判断自己是不是第一个
if(index<1){
try {
//如果自己是第一个,则认为抢到了锁
System.out.println(threadName+"抢到锁了..");
zooKeeper.setData("/",threadName.getBytes(),-1);
countDownLatch.countDown();
}catch (Exception e){
e.printStackTrace();
}
}else{
//只监听自己的前一个
zooKeeper.exists("/"+children.get(index-1),this,this,"abc");
}
}
}
/**
* 节点创建成功时的回调
* @param rc
* @param path
* @param ctx
* @param name
*/
@Override
public void processResult(int rc, String path, Object ctx, String name) {
pathName = name;
System.out.println(threadName+"-节点创建成功:"+pathName);
//处的watch为false,表示不需要对根节点下的所有节点进行watch,我们只需要监听自己的前一个即可
zooKeeper.getChildren("/",false,this,"abc");
}
/**
* DataCallback接口
* 当getdata有数据时的回调
* @param rc
* @param path
* @param ctx
* @param data
* @param stat
*/
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
//TODO
}
/**
* StatCallback接口
* 判断节点是否存在时的回调
* @param rc
* @param path
* @param ctx
* @param stat
*/
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
//TODO
/*if(stat != null){
zooKeeper.getData("/lock",this,this,"abc");
}*/
}
/**
* Watcher 接口
* 节点的事件回调
* @param event
*/
@Override
public void process(WatchedEvent event) {
Event.EventType eventType = event.getType();
String path = event.getPath();
switch (eventType) {
case None:
break;
case NodeCreated:
System.out.println("节点被创建...");
break;
case NodeDeleted:
//当前一个节点被删除,判断自己是不是第一个
System.out.println(path+"-节点被删除...");
//执行获取所有孩子节点的操作
zooKeeper.getChildren("/",false,this,"abc");
break;
case NodeDataChanged:
break;
case NodeChildrenChanged:
break;
case DataWatchRemoved:
break;
case ChildWatchRemoved:
break;
case PersistentWatchRemoved:
break;
}
}
public void setThreadName(String threadName) {
this.threadName = threadName;
}
public void setZooKeeper(ZooKeeper zooKeeper) {
this.zooKeeper = zooKeeper;
}
}
4、ZkLockTest.java 测试类
package com.bx.wx.system.zk;
import org.apache.zookeeper.ZooKeeper;
/**
* @创建人 z.bx
* @创建时间 2021/5/21
*/
public class ZkLockTest {
public static void main(String[] args)throws Exception {
ZooKeeper zooKeeper = ZKUtils.getZooKeeper();
//模拟多线程请求
for (int i = 0; i < 5; i++) {
String threadName = "LockThread-"+i;
new Thread(()->{
ZKLockUtils lockUtils = new ZKLockUtils();
lockUtils.setZooKeeper(zooKeeper);
lockUtils.setThreadName(threadName);
//加锁
lockUtils.lock();
System.out.println(Thread.currentThread().getName()+"正在执行任务");
//模拟执行任务
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
//解锁
lockUtils.ulock();
},threadName).start();
}
}
}
发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/128665.html原文链接:https://javaforall.cn
【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛
【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...