大家好,又见面了,我是你们的朋友全栈君。
zookeeper分布式锁的使用会涉及到分布式事物
因此封装有@Transactional的方法如下:
@Override
public BizReturn<String> insertMagicCubeVehicles(MagicCubeVehicleSaveRequest request) throws BizException {
ZooKeeperSession zkSession = ZooKeeperSession.getInstance();
zkSession.acquireDistributedLock("/magicCube-lock-"+request.getVehicleNumber());
try{
BizReturn<String> bizReturn = insertMagicCubeVehicle(request);
return bizReturn;
}finally {
// 释放分布式锁
zkSession.releaseDistributedLock("/magicCube-lock-"+request.getVehicleNumber());
}
}
封装的方法:
insertMagicCubeVehicle(request);
ZooKeeperSession的工具类:
package tf56.magiccube.util.zookeeper;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
/** * ZooKeeperSession * @author Administrator * */
public class ZooKeeperSession {
private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
private ZooKeeper zookeeper;
public ZooKeeperSession() {
// 去连接zookeeper server,创建会话的时候,是异步去进行的
// 所以要给一个监听器,说告诉我们什么时候才是真正完成了跟zk server的连接
try {
this.zookeeper = new ZooKeeper(
"mt-zookeeper-vip:2181",
50000,
new ZooKeeperWatcher());
// 给一个状态CONNECTING,连接中
System.out.println(zookeeper.getState());
try {
// CountDownLatch
// java多线程并发同步的一个工具类
// 会传递进去一些数字,比如说1,2 ,3 都可以
// 然后await(),如果数字不是0,那么久卡住,等待
// 其他的线程可以调用coutnDown(),减1
// 如果数字减到0,那么之前所有在await的线程,都会逃出阻塞的状态
// 继续向下运行
connectedSemaphore.await();
} catch(InterruptedException e) {
e.printStackTrace();
}
System.out.println("ZooKeeper session established......");
} catch (Exception e) {
e.printStackTrace();
}
}
/** * 获取分布式锁 * @param productId */
public void acquireDistributedLock(Long productId) {
String path = "/product-lock-" + productId;
try {
zookeeper.create(path, "".getBytes(),
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("success to acquire lock for product[id=" + productId + "]");
} catch (Exception e) {
// 如果那个车辆对应的锁的node,已经存在了,就是已经被别人加锁了,那么就这里就会报错
// NodeExistsException
int count = 0;
while(true) {
try {
Thread.sleep(1000);
zookeeper.create(path, "".getBytes(),
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
} catch (Exception e2) {
count++;
System.out.println("the " + count + " times try to acquire lock for product[id=" + productId + "]......");
continue;
}
System.out.println("success to acquire lock for product[id=" + productId + "] after " + count + " times try......");
break;
}
}
}
/** * 获取分布式锁 * @param path */
public void acquireDistributedLock(String path) {
try {
zookeeper.create(path, "".getBytes(),
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("success to acquire lock for " + path);
} catch (Exception e) {
// 如果那个车辆对应的锁的node,已经存在了,就是已经被别人加锁了,那么就这里就会报错
// NodeExistsException
int count = 0;
while(true) {
try {
Thread.sleep(1000);
zookeeper.create(path, "".getBytes(),
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
} catch (Exception e2) {
count++;
System.out.println("the " + count + " times try to acquire lock for " + path + "......");
continue;
}
System.out.println("success to acquire lock for " + path + " after " + count + " times try......");
break;
}
}
}
/** * 获取分布式锁 * @param path */
public boolean acquireFastFailedDistributedLock(String path) {
try {
zookeeper.create(path, "".getBytes(),
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("success to acquire lock for " + path);
return true;
} catch (Exception e) {
System.out.println("fail to acquire lock for " + path);
}
return false;
}
/** * 释放掉一个分布式锁 * @param productId */
public void releaseDistributedLock(Long productId) {
String path = "/product-lock-" + productId;
try {
zookeeper.delete(path, -1);
System.out.println("release the lock for product[id=" + productId + "]......");
} catch (Exception e) {
e.printStackTrace();
}
}
/** * 释放掉一个分布式锁 * @param path */
public void releaseDistributedLock(String path) {
try {
zookeeper.delete(path, -1);
System.out.println("release the lock for " + path + "......");
} catch (Exception e) {
e.printStackTrace();
}
}
public String getNodeData(String path) {
try {
return new String(zookeeper.getData(path, false, new Stat()));
} catch (Exception e) {
e.printStackTrace();
}
return "";
}
public void setNodeData(String path, String data) {
try {
zookeeper.setData(path, data.getBytes(), -1);
} catch (Exception e) {
e.printStackTrace();
}
}
public void createNode(String path) {
try {
zookeeper.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (Exception e) {
}
}
/** * 建立zk session的watcher * @author Administrator * */
private class ZooKeeperWatcher implements Watcher {
public void process(WatchedEvent event) {
System.out.println("Receive watched event: " + event.getState());
if(KeeperState.SyncConnected == event.getState()) {
connectedSemaphore.countDown();
}
}
}
/** * 封装单例的静态内部类 * @author Administrator * */
private static class Singleton {
private static ZooKeeperSession instance;
static {
instance = new ZooKeeperSession();
}
public static ZooKeeperSession getInstance() {
return instance;
}
}
/** * 获取单例 * @return */
public static ZooKeeperSession getInstance() {
return Singleton.getInstance();
}
/** * 初始化单例的便捷方法 */
public static void init() {
getInstance();
}
}
发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/129412.html原文链接:https://javaforall.cn
【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛
【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...