zookeeper分布式锁实现原理(分布式锁怎么实现)

   摘要:本文要使用Zookeeper来实现一个分布式锁,是一个悲观锁。  本文源码请在这里下载:https://github.com/appleappleapple/DistributeLearning一、锁设计  获取锁实现思路:1.首先创建一个作为锁目录(znode),通常用它来描述锁定的实体,称为:/lock_node2.希望获得锁的客户端在锁目录下创建zno…

大家好,又见面了,我是你们的朋友全栈君。

     摘要:本文要使用Zookeeper来实现一个分布式锁,是一个悲观锁。

    本文源码请在这里下载:https://github.com/appleappleapple/DistributeLearning

一、锁设计

  获取锁实现思路:
1. 首先创建一个作为锁目录(znode),通常用它来描述锁定的实体,称为:/lock_node
2. 希望获得锁的客户端在锁目录下创建znode,作为锁/lock_node的子节点,并且节点类型为有序临时节点(EPHEMERAL_SEQUENTIAL);
例如:有两个客户端创建znode,分别为/lock_node/lock-1和/lock_node/lock-2
3. 当前客户端调用getChildren(/lock_node)得到锁目录所有子节点,不设置watch,接着获取小于自己(步骤2创建)的兄弟节点
4. 步骤3中获取小于自己的节点不存在 && 最小节点与步骤2中创建的相同,说明当前客户端顺序号最小,获得锁,结束。
5. 客户端监视(watch)相对自己次小的有序临时节点状态
6. 如果监视的次小节点状态发生变化,则跳转到步骤3,继续后续操作,直到退出锁竞争。     

分布锁笔者这里就不做介绍了,来看看整个代码设计的流程图如下

zookeeper分布式锁实现原理(分布式锁怎么实现)

二、代码

接下来我们就开始编程了~

1、DistributedLock接口定义

 

package com.github.distribute.lock;

import java.util.concurrent.TimeUnit;

public interface DistributedLock {

	/**
	 * 尝试获取锁,不进行等待。得到返回true,
	 * 
	 * @return
	 * @throws Exception
	 */
	public boolean tryLock() throws Exception;

	/**
	 * 阻塞等待获取锁
	 * 
	 * @throws Exception
	 */
	public void lock() throws Exception;

	/**
	 * 在规定时间内等待获取锁
	 * 
	 * @param time
	 * @param unit
	 * @return
	 * @throws Exception
	 */
	public boolean lock(long time, TimeUnit unit) throws Exception;

	/**
	 * 释放锁
	 * 
	 * @throws Exception
	 */
	public void unLock() throws Exception;

}

2、部分实现BaseDistributedLock

 

 

package com.github.distribute.zookeeper;

import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.github.distribute.lock.DistributedLock;

public abstract class BaseDistributedLock implements DistributedLock {

	private static Logger logger = LoggerFactory.getLogger(BaseDistributedLock.class);

	private ZooKeeper zooKeeper;
	private String rootPath;// 根路径名
	private String lockNamePre;// 锁前缀
	private String currentLockPath;// 用于保存某个客户端在locker下面创建成功的顺序节点,用于后续相关操作使用(如判断)
	private static int MAX_RETRY_COUNT = 10;// 最大重试次数
	

	
	/**
	 * 初始化根目录
	 */
	private void init() {
		try {
			Stat stat = zooKeeper.exists(rootPath, false);// 判断一下根目录是否存在
			if (stat == null) {
				zooKeeper.create(rootPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
			}
		} catch (Exception e) {
			logger.error("create rootPath error", e);
		}
	}

	/**
	 * 取得锁的排序号
	 * 
	 * @param str
	 * @param lockName
	 * @return
	 */
	private String getLockNodeNumber(String str, String lockName) {
		int index = str.lastIndexOf(lockName);
		if (index >= 0) {
			index += lockName.length();
			return index <= str.length() ? str.substring(index) : "";
		}
		return str;
	}

	/**
	 * 取得锁的排序列表
	 * 
	 * @return
	 * @throws Exception
	 */
	private List<String> getSortedChildren() throws Exception {
		List<String> children = zooKeeper.getChildren(rootPath, false);
		if (children != null && !children.isEmpty()) {
			Collections.sort(children, new Comparator<String>() {
				public int compare(String lhs, String rhs) {
					return getLockNodeNumber(lhs, lockNamePre).compareTo(getLockNodeNumber(rhs, lockNamePre));
				}
			});
		}
		logger.info("sort childRen:{}", children);
		return children;
	}

	/**
	 * 删除锁节点
	 */
	private void deleteLockNode() {
		try {
			zooKeeper.delete(currentLockPath, -1);
		} catch (Exception e) {
			logger.error("unLock error", e);

		}
	}

	/**
	 * 该方法用于判断自己是否获取到了锁,即自己创建的顺序节点在locker的所有子节点中是否最小.如果没有获取到锁,则等待其它客户端锁的释放,
	 * 并且稍后重试直到获取到锁或者超时
	 * 
	 * @param startMillis
	 * @param millisToWait
	 * @param ourPath
	 * @return
	 * @throws Exception
	 */
	private boolean waitToLock(long startMillis, Long millisToWait) throws Exception {

		boolean haveTheLock = false;
		boolean doDelete = false;

		try {
			while (!haveTheLock) {
				logger.info("get Lock Begin");
				// 该方法实现获取locker节点下的所有顺序节点,并且从小到大排序,
				List<String> children = getSortedChildren();
				String sequenceNodeName = currentLockPath.substring(rootPath.length() + 1);

				// 计算刚才客户端创建的顺序节点在locker的所有子节点中排序位置,如果是排序为0,则表示获取到了锁
				int ourIndex = children.indexOf(sequenceNodeName);

				/*
				 * 如果在getSortedChildren中没有找到之前创建的[临时]顺序节点,这表示可能由于网络闪断而导致
				 * Zookeeper认为连接断开而删除了我们创建的节点,此时需要抛出异常,让上一级去处理
				 * 上一级的做法是捕获该异常,并且执行重试指定的次数 见后面的 attemptLock方法
				 */
				if (ourIndex < 0) {
					logger.error("not find node:{}", sequenceNodeName);
					throw new Exception("节点没有找到: " + sequenceNodeName);
				}

				// 如果当前客户端创建的节点在locker子节点列表中位置大于0,表示其它客户端已经获取了锁
				// 此时当前客户端需要等待其它客户端释放锁,
				boolean isGetTheLock = ourIndex == 0;

				// 如何判断其它客户端是否已经释放了锁?从子节点列表中获取到比自己次小的哪个节点,并对其建立监听
				String pathToWatch = isGetTheLock ? null : children.get(ourIndex - 1);

				if (isGetTheLock) {
					logger.info("get the lock,currentLockPath:{}", currentLockPath);
					haveTheLock = true;
				} else {
					// 如果次小的节点被删除了,则表示当前客户端的节点应该是最小的了,所以使用CountDownLatch来实现等待
					String previousSequencePath = rootPath.concat("/").concat(pathToWatch);
					final CountDownLatch latch = new CountDownLatch(1);
					final Watcher previousListener = new Watcher() {
						public void process(WatchedEvent event) {
							if (event.getType() == EventType.NodeDeleted) {
								latch.countDown();
							}
						}
					};

					// 如果节点不存在会出现异常
					zooKeeper.exists(previousSequencePath, previousListener);

					// 如果有超时时间,刚到超时时间就返回
					if (millisToWait != null) {
						millisToWait -= (System.currentTimeMillis() - startMillis);
						startMillis = System.currentTimeMillis();
						if (millisToWait <= 0) {
							doDelete = true; // timed out - delete our node
							break;
						}

						latch.await(millisToWait, TimeUnit.MICROSECONDS);
					} else {
						latch.await();
					}
				}
			}
		} catch (Exception e) {
			// 发生异常需要删除节点
			logger.error("waitToLock exception", e);
			doDelete = true;
			throw e;
		} finally {
			// 如果需要删除节点
			if (doDelete) {
				deleteLockNode();
			}
		}
		logger.info("get Lock end,haveTheLock=" + haveTheLock);
		return haveTheLock;
	}

	/**
	 * createLockNode用于在locker(basePath持久节点)下创建客户端要获取锁的[临时]顺序节点
	 * 
	 * @param path
	 * @return
	 * @throws Exception
	 */
	private String createLockNode(String path) throws Exception {
		Stat stat = zooKeeper.exists(rootPath, false);
		// 判断一下根目录是否存在
		if (stat == null) {
			zooKeeper.create(rootPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
		}
		return zooKeeper.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
	}

	/**
	 * 尝试获取锁,如果不加超时时间,阻塞等待。否则,就是加了超时的阻塞等待
	 * 
	 * @param time
	 * @param unit
	 * @return
	 * @throws Exception
	 */
	protected Boolean attemptLock(long time, TimeUnit unit) throws Exception {
		final long startMillis = System.currentTimeMillis();
		final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;

		boolean hasTheLock = false;
		boolean isDone = false;
		int retryCount = 0;

		// 网络闪断需要重试一试,最大重试次数MAX_RETRY_COUNT
		while (!isDone) {
			isDone = true;
			try {
				currentLockPath = createLockNode(rootPath.concat("/").concat(lockNamePre));
				hasTheLock = waitToLock(startMillis, millisToWait);

			} catch (Exception e) {
				if (retryCount++ < MAX_RETRY_COUNT) {
					isDone = false;
				} else {
					throw e;
				}
			}
		}

		return hasTheLock;
	}
}

waitToLock是最主要的代码
 

 

3、完整实现

 

package com.github.distribute.zookeeper;
import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.github.distribute.lock.DistributedLock;
public class ZookeeperDistributeLock implements DistributedLock {
private static Logger logger = LoggerFactory.getLogger(ZookeeperDistributeLock.class);
public static void main(String[] args) throws IOException {
ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 60000, null);
ZookeeperDistributeLock myLock = new ZookeeperDistributeLock(zooKeeper, "/test", "lock-");
while (true) {
try {
myLock.lock();
Thread.sleep(5000);
} catch (Exception e) {
} finally {
myLock.unLock();
}
}
}
private ZooKeeper zooKeeper;
private String rootPath;// 根路径名
private String lockNamePre;// 锁前缀
private String currentLockPath;// 用于保存某个客户端在locker下面创建成功的顺序节点,用于后续相关操作使用(如判断)
private static int MAX_RETRY_COUNT = 10;// 最大重试次数
public ZookeeperDistributeLock(ZooKeeper zookeeper, String rootPath, String lockNamePre) {
logger.info("rootPath:{},lockNamePre:{}", rootPath, lockNamePre);
this.zooKeeper = zookeeper;
this.rootPath = rootPath;
this.lockNamePre = lockNamePre;
init();
}
/**
* 初始化根目录
*/
private void init() {
try {
Stat stat = zooKeeper.exists(rootPath, false);// 判断一下根目录是否存在
if (stat == null) {
zooKeeper.create(rootPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (Exception e) {
logger.error("create rootPath error", e);
}
}
/**
* 取得锁的排序号
* 
* @param str
* @param lockName
* @return
*/
private String getLockNodeNumber(String str, String lockName) {
int index = str.lastIndexOf(lockName);
if (index >= 0) {
index += lockName.length();
return index <= str.length() ? str.substring(index) : "";
}
return str;
}
/**
* 取得锁的排序列表
* 
* @return
* @throws Exception
*/
private List<String> getSortedChildren() throws Exception {
List<String> children = zooKeeper.getChildren(rootPath, false);
if (children != null && !children.isEmpty()) {
Collections.sort(children, new Comparator<String>() {
public int compare(String lhs, String rhs) {
return getLockNodeNumber(lhs, lockNamePre).compareTo(getLockNodeNumber(rhs, lockNamePre));
}
});
}
logger.info("sort childRen:{}", children);
return children;
}
/**
* 该方法用于判断自己是否获取到了锁,即自己创建的顺序节点在locker的所有子节点中是否最小.如果没有获取到锁,则等待其它客户端锁的释放,
* 并且稍后重试直到获取到锁或者超时
* 
* @param startMillis
* @param millisToWait
* @param ourPath
* @return
* @throws Exception
*/
private boolean waitToLock(long startMillis, Long millisToWait) throws Exception {
boolean haveTheLock = false;
boolean doDelete = false;
try {
while (!haveTheLock) {
logger.info("get Lock Begin");
// 该方法实现获取locker节点下的所有顺序节点,并且从小到大排序,
List<String> children = getSortedChildren();
String sequenceNodeName = currentLockPath.substring(rootPath.length() + 1);
// 计算刚才客户端创建的顺序节点在locker的所有子节点中排序位置,如果是排序为0,则表示获取到了锁
int ourIndex = children.indexOf(sequenceNodeName);
/*
* 如果在getSortedChildren中没有找到之前创建的[临时]顺序节点,这表示可能由于网络闪断而导致
* Zookeeper认为连接断开而删除了我们创建的节点,此时需要抛出异常,让上一级去处理
* 上一级的做法是捕获该异常,并且执行重试指定的次数 见后面的 attemptLock方法
*/
if (ourIndex < 0) {
logger.error("not find node:{}", sequenceNodeName);
throw new Exception("节点没有找到: " + sequenceNodeName);
}
// 如果当前客户端创建的节点在locker子节点列表中位置大于0,表示其它客户端已经获取了锁
// 此时当前客户端需要等待其它客户端释放锁,
boolean isGetTheLock = ourIndex == 0;
// 如何判断其它客户端是否已经释放了锁?从子节点列表中获取到比自己次小的哪个节点,并对其建立监听
String pathToWatch = isGetTheLock ? null : children.get(ourIndex - 1);
if (isGetTheLock) {
logger.info("get the lock,currentLockPath:{}", currentLockPath);
haveTheLock = true;
} else {
// 如果次小的节点被删除了,则表示当前客户端的节点应该是最小的了,所以使用CountDownLatch来实现等待
String previousSequencePath = rootPath.concat("/").concat(pathToWatch);
final CountDownLatch latch = new CountDownLatch(1);
final Watcher previousListener = new Watcher() {
public void process(WatchedEvent event) {
if (event.getType() == EventType.NodeDeleted) {
latch.countDown();
}
}
};
// 如果节点不存在会出现异常
zooKeeper.exists(previousSequencePath, previousListener);
// 如果有超时时间,刚到超时时间就返回
if (millisToWait != null) {
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if (millisToWait <= 0) {
doDelete = true; // timed out - delete our node
break;
}
latch.await(millisToWait, TimeUnit.MICROSECONDS);
} else {
latch.await();
}
}
}
} catch (Exception e) {
// 发生异常需要删除节点
logger.error("waitToLock exception", e);
doDelete = true;
throw e;
} finally {
// 如果需要删除节点
if (doDelete) {
unLock();
}
}
logger.info("get Lock end,haveTheLock=" + haveTheLock);
return haveTheLock;
}
/**
* createLockNode用于在locker(basePath持久节点)下创建客户端要获取锁的[临时]顺序节点
* 
* @param path
* @return
* @throws Exception
*/
private String createLockNode(String path) throws Exception {
Stat stat = zooKeeper.exists(rootPath, false);
// 判断一下根目录是否存在
if (stat == null) {
zooKeeper.create(rootPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
return zooKeeper.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
}
/**
* 尝试获取锁,如果不加超时时间,阻塞等待。否则,就是加了超时的阻塞等待
* 
* @param time
* @param unit
* @return
* @throws Exception
*/
private Boolean attemptLock(long time, TimeUnit unit) throws Exception {
final long startMillis = System.currentTimeMillis();
final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
boolean hasTheLock = false;
boolean isDone = false;
int retryCount = 0;
// 网络闪断需要重试一试,最大重试次数MAX_RETRY_COUNT
while (!isDone) {
isDone = true;
try {
currentLockPath = createLockNode(rootPath.concat("/").concat(lockNamePre));
hasTheLock = waitToLock(startMillis, millisToWait);
} catch (Exception e) {
if (retryCount++ < MAX_RETRY_COUNT) {
isDone = false;
} else {
throw e;
}
}
}
return hasTheLock;
}
public boolean tryLock() throws Exception {
logger.info("tryLock Lock Begin");
// 该方法实现获取locker节点下的所有顺序节点,并且从小到大排序,
List<String> children = getSortedChildren();
String sequenceNodeName = currentLockPath.substring(rootPath.length() + 1);
// 计算刚才客户端创建的顺序节点在locker的所有子节点中排序位置,如果是排序为0,则表示获取到了锁
int ourIndex = children.indexOf(sequenceNodeName);
if (ourIndex < 0) {
logger.error("not find node:{}", sequenceNodeName);
throw new Exception("节点没有找到: " + sequenceNodeName);
}
// 如果当前客户端创建的节点在locker子节点列表中位置大于0,表示其它客户端已经获取了锁
return ourIndex == 0;
}
public void lock() throws Exception {
// -1,null表示阻塞等待,不设置超时时间
attemptLock(-1, null);
}
public boolean lock(long time, TimeUnit unit) throws Exception {
if (time <= 0) {
throw new Exception("Lock wait for time must greater than 0");
}
if (unit == null) {
throw new Exception("TimeUnit can not be null");
}
return attemptLock(time, unit);
}
public void unLock() {
try {
zooKeeper.delete(currentLockPath, -1);
} catch (Exception e) {
logger.error("unLock error", e);
}
}
}

 

三、对比

在文章Redis分布式锁—-悲观锁实现,以秒杀系统为例,我们用redis也实现了分布式锁。zk的方案最大的优势在于避免结点挂掉后导致的死锁;redis的方案最大的优势在于性能超强;在实际生产过程中,结合自身情况来决定最适合的分布式锁。

更多技术请关注笔者微信技术公众号”单例模式”

zookeeper分布式锁实现原理(分布式锁怎么实现)

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/128622.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)
blank

相关推荐

  • RTP协议全解析(H264码流和PS流)「建议收藏」

    RTP协议全解析(H264码流和PS流)「建议收藏」1RTPHeader解析2、RTP荷载H264码流2.1、单个NAL单元包2.2、分片单元(FU-A)3、RTP荷载PS流3.1、PS包头3.2、系统标题3.3、节目映射流3.4、PES分组头部

  • Java Double转Bigdecimal丢失精度原因学习

    Java Double转Bigdecimal丢失精度原因学习记录学习Double转Bigdecimal丢失精度的原因注意事项:不能直接使用Bigdecimal的构造函数传double进行转换,部分数值会丢失精度,因为计算机是二进制的Double无法精确的储存一些小数位,0.1的double数据存储的值实际上并不真的等于0.1如该方式将0.1转换为Bigdecimal得到的结果是0.1000000000000000055511151231257827021181583404541015625这是为什么呢,以往只是知道结论知道不能这么用,也大概知道是因为do

  • python的缩进规则是什么意思_python什么情况下需要缩进

    python的缩进规则是什么意思_python什么情况下需要缩进一般的语言都是通过{}或end来作为代码块的标记,而Python则是通过缩进来识别代码块的。对于Python的这种“缩进”风格,喜欢它的人说这是一种乐趣;不喜欢它的人说这是一门需要卡尺的语言,因为需要使用“游标卡尺”去测量每行代码的缩进。不管怎么样,Python的开发者有意让违反了缩进规则的程序不能通过编译,以此让程序员养成良好的编程习惯。并且Python语言利用缩进表示语句块的开始和退出,而非使…

    2022年10月13日
  • iOS5.1.1完美越狱教程

    iOS5.1.1完美越狱教程pod2g和绿毒都没有食言,iOS 5.1.1完美越狱工具在北京时间5月25日晚20点30分出现在了绿毒的官方上。与上次不同的是,本次越狱一口气发了三个版本——OS X、Windows、以及linux,狱友们不用再熬夜了!本次越狱所支持的设备:- iPad3 (WiFi/CDMA/Global)- iPad2 (WiFi/CDMA/GSM)- iPad1- iPhone

  • MySQL大小写敏感问题和命名规范

    MySQL大小写敏感问题和命名规范

  • 数据仓库常见建模方法与建模实例演示[通俗易懂]

    数据仓库常见建模方法与建模实例演示[通俗易懂]1.数据仓库建模的目的?为什么要进行数据仓库建模?大数据的数仓建模是通过建模的方法更好的组织、存储数据,以便在性能、成本、效率和数据质量之间找到最佳平衡点。一般主要从下面四点考虑访问性能:能够快速查询所需的数据,减少数据I/O 数据成本:减少不必要的数据冗余,实现计算结果数据复用,降低大数据系统中的存储成本和计算成本 使用效率:改善用户应用体验,提高使用数据的效率 数据质量…

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号