zookeeper使用场景及示例_与Zookeeper类似的框架

zookeeper使用场景及示例_与Zookeeper类似的框架源码:https://gitee.com/suwenguang/testzookeeper集群角色:leader主follower从observer观察者:不参与写的选举,但是提供读概念:数据模型zookeeper的数据模型和文件系统类似,每一个节点称为:znode.是zookeeper中的最小数据单元。每一个znode上都可以保存数据和挂载子…

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全系列IDE稳定放心使用

个人博客:https://suveng.github.io/blog/​​​​​​​

源码:https://gitee.com/suwenguang/test

zookeeper集群角色:

  1. leader 主
  2. follower 从
  3. observer 观察者 :不参与写的选举,但是提供读

概念:

  1. 数据模型

    zookeeper的数据模型和文件系统类似,每一个节点称为:znode. 是zookeeper中的最小数据单元。每一个znode上都可以

    保存数据和挂载子节点。 从而构成一个层次化的属性结构

    节点特性

    持久化节点 : 节点创建后会一直存在zookeeper服务器上,直到主动删除

    持久化有序节点 :每个节点都会为它的一级子节点维护一个顺序

    临时节点 : 临时节点的生命周期和客户端的会话保持一致。当客户端会话失效,该节点自动清理

    临时有序节点 : 在临时节点上多勒一个顺序性特性

  2. 会话

  3. watcherzookeeper提供了分布式数据发布/订阅****,zookeeper****允许客户端向服务器注册一个watcher监听。当服务器端的节点触发指定事件的时候

    **会触发watcher。服务端会向客户端发送一个事件通知****watcher的通知是一次性,一旦触发一次通知后,该watcher就失效

  4. ACL
    zookeeper提供控制节点访问权限的功能,用于有效的保证zookeeper中数据的安全性。避免误操作而导致系统出现重大事故。

    CREATE /READ/WRITE/DELETE/ADMIN

zookeeper的命令操作

  1. create [-s] [-e] path data acl
    -s 表示节点是否有序
    -e 表示是否为临时节点
    默认情况下,是持久化节点

  2. get path [watch]
    获得指定 path的信息

  3. set path data [version]
    修改节点 path对应的data
    乐观锁的概念
    数据库里面有一个 version 字段去控制数据行的版本号

  4. delete path [version]
    删除节点

stat信息

cversion = 0 子节点的版本号
aclVersion = 0 表示acl的版本号,修改节点权限
dataVersion = 1 表示的是当前节点数据的版本号

czxid 节点被创建时的事务ID
mzxid 节点最后一次被更新的事务ID
pzxid 当前节点下的子节点最后一次被修改时的事务ID

ctime = Sat Aug 05 20:48:26 CST 2017
mtime = Sat Aug 05 20:48:50 CST 2017

java API操作zookeeper

demo实例我已经放在gitee上面。在zookeeper_demo的模块下面。

https://gitee.com/suwenguang/test

注意需要导入jar

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.8</version>
</dependency>
package 操作zookeeper.javaAPI;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/** * author Veng Su * email 1344114844@qq.com * date 18-9-16 下午1:19 */
//这里实现watcher借口是为了方便构造zookeeper对象
public class APIdemo implements Watcher { 
   
    public final static String CONNECTSTRING = "192.168.0.201:2181,192.168.0.203:2181,192.168.0.204:2181";

    public static CountDownLatch countDownLatch = new CountDownLatch(1);//计数器用于同步连接
    public static ZooKeeper zooKeeper;//zookeeper对象
    public static Stat stat = new Stat();//不给的全局,状态会话会丢失,导致执行失败

    public static void main(String[] args) throws Exception { 
   
        APIdemo apIdemo = new APIdemo();

        zooKeeper = new ZooKeeper(CONNECTSTRING, 500, new APIdemo());//创建zookeeper对象

        countDownLatch.await();
        System.out.println(zooKeeper.getState());
//注意我都是写死的节点。便于学习
// System.out.println(zooKeeper.getData("/suveng",apIdemo,stat));
// apIdemo.delete();//这里是删除节点,注意要有才能删除
// apIdemo.get(apIdemo);//这里是获取节点
// apIdemo.create();//创建节点
// apIdemo.set();//修改节点
// Thread.sleep(2000);
//
// Thread.sleep(1000);
// zooKeeper.create("/suveng/sds", "dddd".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
// System.out.println(zooKeeper.getChildren("/suveng",true));


// 权限控制部分:ip,digest,world,super
// zooKeeper.addAuthInfo("digest","root:root".getBytes());

        //第一种方式做权限
        ACL acl=new ACL(ZooDefs.Perms.CREATE, new Id("digest","root:root"));

        List<ACL> acls=new ArrayList<>();
        acls.add(acl);
        zooKeeper.create("/suveng/auth","2122".getBytes(),acls,CreateMode.PERSISTENT);

        //第二种方式做权限
// zooKeeper.addAuthInfo("digest","root:root".getBytes());

        //创建新的客户端
        APIdemo apIdemo1=new APIdemo();
        ZooKeeper zooKeeper1=new ZooKeeper(CONNECTSTRING,5000,apIdemo1);
        zooKeeper1.getData("/suveng/auth",true,new Stat());

    }


    private void create() { 
   
        //create
        String res = null;
        try { 
   
            res = zooKeeper.create("/suveng", "hello,suwenguang".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } catch (KeeperException | InterruptedException e) { 
   
            e.printStackTrace();
        }
        System.out.println("create " + res);
    }

    private String set() throws KeeperException, InterruptedException { 
   
        zooKeeper.setData("/suveng", "dddd".getBytes(), -1);
        return null;

    }

    private void get(APIdemo apIdemo) throws KeeperException, InterruptedException { 
   
        zooKeeper.getData("/suveng", apIdemo, stat);
    }
    private void delete() throws KeeperException, InterruptedException { 
   
        zooKeeper.delete("/suveng",-1);
    }

    //watcher 的实现方法
    @Override
    public void process(WatchedEvent watchedEvent) { 
   
        if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { 
   
            if (watchedEvent.getType() == Event.EventType.None && watchedEvent.getPath() == null) { 
   
                countDownLatch.countDown();
                System.out.println(watchedEvent.getState());
            } else if (watchedEvent.getType() == Event.EventType.NodeDataChanged) { 
   
                try { 
   
                    System.out.println("path---->" + watchedEvent.getPath() + " |data change--->" +
                            zooKeeper.getData(watchedEvent.getPath(), true, stat));
                } catch (KeeperException e) { 
   
                    e.printStackTrace();
                } catch (InterruptedException e) { 
   
                    e.printStackTrace();
                }
            } else if (watchedEvent.getType() == Event.EventType.NodeCreated) { 
   
                try { 
   
                    System.out.println("path---->" + watchedEvent.getPath() + " |created--->" +
                            zooKeeper.getData(watchedEvent.getPath(), true, stat));
                } catch (KeeperException e) { 
   
                    e.printStackTrace();
                } catch (InterruptedException e) { 
   
                    e.printStackTrace();
                }
            } else if (watchedEvent.getType() == Event.EventType.NodeDeleted) { 
   
                try { 
   
                    System.out.println("path---->" + watchedEvent.getPath() + " |data deleted--->" +
                            zooKeeper.getData(watchedEvent.getPath(), true, stat));
                } catch (KeeperException e) { 
   
                    e.printStackTrace();
                } catch (InterruptedException e) { 
   
                    e.printStackTrace();
                }
            } else if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged) { 
   
                try { 
   
                    System.out.println("path---->" + watchedEvent.getPath() + " |data child changed--->" +
                            zooKeeper.getData(watchedEvent.getPath(), true, stat));
                } catch (KeeperException e) { 
   
                    e.printStackTrace();
                } catch (InterruptedException e) { 
   
                    e.printStackTrace();
                }
            }

        }
    }
}

权限控制模式

schema:授权对象

ip : 192.168.1.1
Digest : username:password
world : 开放式的权限控制模式,数据节点的访问权限对所有用户开放。 world:anyone
super :超级用户,可以对zookeeper上的数据节点进行操作

连接状态

KeeperStat.Expired 在一定时间内客户端没有收到服务器的通知, 则认为当前的会话已经过期了。
KeeperStat.Disconnected 断开连接的状态
KeeperStat.SyncConnected 客户端和服务器端在某一个节点上建立连接,并且完成一次version、zxid同步
KeeperStat.authFailed 授权失败

事件类型

NodeCreated 当节点被创建的时候,触发
NodeChildrenChanged 表示子节点被创建、被删除、子节点数据发生变化
NodeDataChanged 节点数据发生变化
NodeDeleted 节点被删除
None 客户端和服务器端连接状态发生变化的时候,事件类型就是None

zkClient 操作zookeeper

注意:需要导入jar

zkClient只是简单的封装了Java的zookeeper API。但是总体比Java的要好用,支持递归创建和递归删除。订阅的时候也比较方便。

实例代码依然是放在了gitee上面。

package 操作zookeeper.zkclientdemo;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;

/** * author Veng Su * email 1344114844@qq.com * date 18-9-16 下午4:24 */
public class Zkclientdemo { 
   
    public final static String CONNECTSTRING = "192.168.0.201:2181,192.168.0.203:2181,192.168.0.204:2181";

    public static ZkClient geetInstance() { 
   
        return new ZkClient(CONNECTSTRING, 5000);

    }

    @Test
    public void testcreate() { 
   
        ZkClient zkClient = Zkclientdemo.geetInstance();
        zkClient.createEphemeral("/zktest1");

        ACL acl=new ACL(ZooDefs.Perms.CREATE, new Id("digest","root:root"));
        List<ACL> acls=new ArrayList<>();
        acls.add(acl);
        zkClient.createEphemeral("/zktest2",acls);

        zkClient.createEphemeral("/zktest3","324".getBytes(),acls);

        zkClient.create("/zktest","21".getBytes(), CreateMode.PERSISTENT);
        //递归创建
        zkClient.createPersistent("/digui0/digui1/digui2/digui3",true);


        System.out.println("success");
    }

    @Test
    public void testdelete() { 
   
        ZkClient zkClient = Zkclientdemo.geetInstance();

        //普通删除
        zkClient.delete("/zktest");
        zkClient.delete("/zktest1");
        zkClient.delete("/zktest2");
        zkClient.delete("/zktest3");

        //递归删除
        zkClient.deleteRecursive("/digui0");

        boolean is = zkClient.exists("/suveng");
        System.out.println(is);
    }

    @Test
    public void testWatchers() throws InterruptedException { 
   
        ZkClient zkClient = Zkclientdemo.geetInstance();
        zkClient.subscribeDataChanges("/suveng", new IZkDataListener() { 
   
            @Override
            public void handleDataChange(String s, Object o) throws Exception { 
   
                System.out.println(s+"->"+o);
            }

            @Override
            public void handleDataDeleted(String s) throws Exception { 
   

            }
        });
        zkClient.writeData("/suveng","suwenguang");
        Thread.sleep(1000);
    }

}

curator操作zookeeper

curator是netflix公司开源的。提供了各种使用场景的封装

curator-framework 提供了fluent 风格api

curator-replice 提供实现封装

fluent风格介绍。也就是链式操作如下

package 操作zookeeper.curator;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.Test;

import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * author Veng Su
 * email  1344114844@qq.com
 * date   18-9-16 下午6:32
 */
public class CuratorDemo {
    public final static String CONNECTSTRING = "192.168.0.201:2181,192.168.0.203:2181,192.168.0.204:2181";

    public static CuratorFramework getInstance() {
        return CuratorFrameworkFactory
                .builder()
                .connectString(CONNECTSTRING)
                .sessionTimeoutMs(5000)
                .retryPolicy(new ExponentialBackoffRetry(1000, 5))
                .build();
    }

//    测试创建

    @Test
    public void testCreate() throws Exception {
        CuratorFramework curatorFramework = CuratorDemo.getInstance();
        curatorFramework.start();
        curatorFramework.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .forPath("/suveng/testcurator");

    }


    //    测试删除

    @Test
    public void testDelete() throws Exception {
        CuratorFramework curatorFramework = CuratorDemo.getInstance();
        curatorFramework.start();
        curatorFramework.delete()
                .deletingChildrenIfNeeded()
//                .withVersion()
                .forPath("/suveng/testcurator");

    }

    //测试getdata
    @Test
    public void testGet() throws Exception {
        CuratorFramework curatorFramework = CuratorDemo.getInstance();
        curatorFramework.start();
        Stat stat = new Stat();
        byte[] bytes = curatorFramework.getData()
                .storingStatIn(stat)
                .forPath("/suveng");
        System.out.println(new String(bytes));
        System.out.println("stat->" + stat);
    }

    //    测试setdata
    @Test
    public void testSet() throws Exception {
        CuratorFramework curatorFramework = CuratorDemo.getInstance();
        curatorFramework.start();
        curatorFramework.setData().forPath("/suveng", "knickknack".getBytes());
    }

    //测试异步
    @Test
    public void testasynchronous() throws Exception {
        CuratorFramework curatorFramework = CuratorDemo.getInstance();
        curatorFramework.start();
        ExecutorService executorService = Executors.newFixedThreadPool(1);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        //创建临时节点
        curatorFramework.create().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
            @Override
            public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                System.out.println(Thread.currentThread().getName() + "这是异步的回调函数" + "   result code->" + curatorEvent.getResultCode() + "" +
                        "curator type" + curatorEvent.getType());
                countDownLatch.countDown();
            }
        }, executorService).forPath("/suveng/linshi");//不给其他线程默认是当前线程。
        countDownLatch.await();
        System.out.println(Thread.currentThread().getName() + "main线程");
        executorService.shutdown();


    }

    //事务
    @Test
    public void testTransaction() throws Exception {
        CuratorFramework curatorFramework = CuratorDemo.getInstance();
        curatorFramework.start();

        Collection<CuratorTransactionResult> curatorTransactions = curatorFramework.inTransaction().create().withMode(CreateMode.EPHEMERAL).forPath("/suveng/transaction")
                .and()
                .setData()
                .forPath("/suveng", "sdfsdfsd".getBytes()).and()
                .commit();
        for (CuratorTransactionResult result : curatorTransactions) {
            System.out.println(result.getForPath() + "->" + result.getType());
        }
    }

    //    测试watcher
//    patchcache 监视一个节点的子节点下的创建,删除,更新
//    nodecache 监视一个节点的创建,删除,更新
//    Treecache     patchcache + nodecache 合体 监视路径下的创建,删除,更新,并且缓存路径 所有子节点的数据
    @Test
    public void testWatcher() throws Exception {
        CuratorFramework curatorFramework = CuratorDemo.getInstance();
        curatorFramework.start();

//        NodeCache nodeCache=new NodeCache(curatorFramework,"/suveng",false);
//        nodeCache.start();
//
//        nodeCache.getListenable().addListener(()-> System.out.println("节点数据发生改变,改变后的数据"+new String(nodeCache.getCurrentData().getData())));
//        curatorFramework.setData().forPath("/suveng","sdfasfasfda".getBytes());
//        Thread.sleep(2000);


        PathChildrenCache childrenCache = new PathChildrenCache(curatorFramework, "/suveng", true);
        childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);

        childrenCache.getListenable().addListener((curatorFramework1, pathChildrenCacheEvent) -> {
            switch (pathChildrenCacheEvent.getType()) {
                case CONNECTION_RECONNECTED:
                    System.out.println("reconnetc");
                    break;
                case CHILD_ADDED:
                    System.out.println("child add");
                    break;
                case CHILD_REMOVED:
                    System.out.println("child remove");
                    break;
                case CHILD_UPDATED:
                    System.out.println("child update");
                    break;
                default:
                    break;
            }
        });

//        curatorFramework.setData().forPath("/suveng","sdfsd".getBytes());

//        curatorFramework.delete().forPath("/suveng/test");
        curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/suveng/test");
        TimeUnit.SECONDS.sleep(1);
        curatorFramework.setData().forPath("/suveng/test","sfs".getBytes());
        TimeUnit.SECONDS.sleep(1);

    }

    @Test
    public void testInit() {
//        创建会话的两种方式
//        第一种 normal
        CuratorFramework curatorFramework = CuratorFrameworkFactory
                .newClient(CONNECTSTRING, 5000, 5000, new ExponentialBackoffRetry(1000, 3));
        curatorFramework.start();
//        第二种 fluent
        CuratorFramework curatorFramework2 = CuratorFrameworkFactory
                .builder()
                .connectString(CONNECTSTRING)
                .sessionTimeoutMs(5000)
                .retryPolicy(new ExponentialBackoffRetry(1000, 5))
                .build();
        curatorFramework2.start();
        System.out.println("success ");

    }
}


private void makeFluent(Customer customer) {
    customer.newOrder()
            .with(6, "TAL")
            .with(5, "HPK").skippable()
            .with(3, "LGV")
            .priorityRush();
}

curator的重试策略

ExponentialBackoffRetry() 衰减重试
RetryNTimes 指定最大重试次数
RetryOneTime 仅重试一次
RetryUnitilElapsed 一直重试知道规定的时间

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/186509.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • Keil 使用教程(详解)「建议收藏」

    Keil 使用教程(详解)「建议收藏」(1)打开Keil,点击project新建(2)选择合适的型号,没有STC的选项,不要紧,一般C51的好多兼容的(3)不用汇编的话,下面的这个对话框选择否就可以了(4)点击新建一个文件(5)选择保存,并命名为.c(6)双击SourceGroup(7)添加.c文件(8)写完程序后,点击标号1处的按钮,然后点击标号2处的output,依次点击3和4,生成.hex文件…

  • assertEquals

    assertEqualsassertEqualspublicstaticvoidassertEquals(longexpected,longactual)Assertsthattwolongsare

  • Python和Java哪个就业前景好?

    Python和Java哪个就业前景好?Python和Java这两大编程语言,很多人都喜欢拿来比较,一个是后起之秀,潜力无限;一个是行业经典,成熟稳定,对于想从事IT技术的人员来说,很难抉择,那么,Python和Java到底哪一个就业前景更好呢?Python在国外应用相对成熟,在国内还处于起步阶段,近两年,随着人工智能、机器学习的、大数据以及云计算的兴起,Python发展势如破竹,很多企业开始进入该行列,Python人才是必不可少环…

  • 递归和迭代的区别「建议收藏」

    递归和迭代的区别「建议收藏」递归的基本概念:程序调用自身的编程技巧称为递归,是函数自己调用自己.一个函数在其定义中直接或间接调用自身的一种方法,它通常把一个大型的复杂的问题转化为一个与原问题相似的规模较小的问题来解决,可以极大的减少代码量.递归的能力在于用有限的语句来定义对象的无限集合.使用递归要注意的有两点:1)递归就是在过程或函数里面调用自身;2)在使用递归时,必须有一个明确的递归结束条件,称为递归出口.

  • 数据库建立索引常用的规则

    数据库建立索引常用的规则数据库建立索引常用的规则如下:1、表的主键、外键必须有索引; 2、数据量…

  • manage.py作用_python源码库

    manage.py作用_python源码库源码目录结构ApiResponse这个类没啥好说的classApiResponse(Response):"""继承了requests模块中的Response类

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号