Zookeeper的watch机制


1.watch机制

Zookeeper的应用场景中配置中心,其中看到watch机制

1.1 什么是watch机制

zookeeper作为一款成熟的分布式协调框架,订阅-发布功能是很重要的一个。所谓订阅功能,其实说白了就是观察者模式。观察者会订阅一些感兴趣的主题,然后这些主题一旦变化了,就会自动通知到这些观察者。

zookeeper的订阅发布也就是watch机制,是一个轻量级的设计。因为它采用了一种推拉结合的模式。一旦服务端感知主题变了,那么只会发送一个事件类型和节点信息给关注的客户端,而不会包括具体的变更内容,所以事件本身是轻量级的,这就是所谓的“推”部分。然后,收到变更通知的客户端需要自己去拉变更的数据,这就是“拉”部分。watche机制分为添加数据和监听节点。

Curator在这方面做了优化,Curator引入了Cache的概念用来实现对ZooKeeper服务器端进行事件监听。Cache是Curator对事件监听的包装,其对事件的监听可以近似看做是一个本地缓存视图和远程ZooKeeper视图的对比过程。而且Curator会自动的再次监听,我们就不需要自己手动的重复监听了。

Curator中的cache共有三种

  • NodeCache(监听和缓存根节点变化)

  • PathChildrenCache(监听和缓存子节点变化)

  • TreeCache(监听和缓存根节点变化和子节点变化)

下面我们分别对三种cache详解

1.2. NodeCache

  • 介绍

    NodeCache是用来监听节点的数据变化的,当监听的节点的数据发生变化的时候就会回调对应的函数。

  • 增加监听

 1 //创建重试策略
 2 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,1);
 3 //创建客户端
 4 CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 1000, 1000, retryPolicy);
 5 //开启客户端
 6 client.start();
 7 System.out.println("连接成功");
 8 //创建节点数据监听对象
 9 final NodeCache nodeCache = new NodeCache(client, "/hello");
10 //开始缓存
11 /**
12  * 参数为true:可以直接获取监听的节点,System.out.println(nodeCache.getCurrentData());为ChildData{path='/aa', stat=607,765,1580205779732,1580973376268,2,1,0,0,5,1,608
13 , data=[97, 98, 99, 100, 101]}
14  * 参数为false:不可以获取监听的节点,System.out.println(nodeCache.getCurrentData());为null
15  */
16 nodeCache.start(true);
17 System.out.println(nodeCache.getCurrentData());
18 //添加监听对象
19 nodeCache.getListenable().addListener(new NodeCacheListener() {
20     //如果节点数据有变化,会回调该方法
21     public void nodeChanged() throws Exception {
22         String data = new String(nodeCache.getCurrentData().getData());
23         System.out.println("数据Watcher:路径=" + nodeCache.getCurrentData().getPath()
24                 + ":data=" + data);
25     }
26 });
27 System.in.read();

 测试

修改节点数据

控制台显示

1.3. PathChildrenCache

  • 介绍

    PathChildrenCache是用来监听指定节点 的子节点变化情况

  • 增加监听

 1 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,1);
 2 CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 1000, 1000, retryPolicy);
 3 client.start();
 4 //监听指定节点的子节点变化情况包括?新增子节点 子节点数据变更 和子节点删除
 5 //true表示用于配置是否把节点内容缓存起来,如果配置为true,客户端在接收到节点列表变更的同时,也能够获取到节点的数据内容(即:event.getData().getData())?如果为false 则无法取到数据内容(即:event.getData().getData())
 6 PathChildrenCache childrenCache = new PathChildrenCache(client,"/hello",true);
 7 /**
 8  * NORMAL:  普通启动方式, 在启动时缓存子节点数据
 9  * POST_INITIALIZED_EVENT:在启动时缓存子节点数据,提示初始化
10  * BUILD_INITIAL_CACHE: 在启动时什么都不会输出
11  *  在官方解释中说是因为这种模式会在start执行执行之前先执行rebuild的方法,而rebuild的方法不会发出任何事件通知。
12  */
13 childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
14 System.out.println(childrenCache.getCurrentData());
15 //添加监听
16 childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
17     @Override
18     public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
19         if(event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED){
20             System.out.println("子节点更新");
21             System.out.println("节点:"+event.getData().getPath());
22             System.out.println("数据" + new String(event.getData().getData()));
23         }else if(event.getType() == PathChildrenCacheEvent.Type.INITIALIZED ){
24             System.out.println("初始化操作");
25         }else if(event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED ){
26             System.out.println("删除子节点");
27             System.out.println("节点:"+event.getData().getPath());
28             System.out.println("数据" + new String(event.getData().getData()));
29         }else if(event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED ){
30             System.out.println("添加子节点");
31             System.out.println("节点:"+event.getData().getPath());
32             System.out.println("数据" + new String(event.getData().getData()));
33         }else if(event.getType() == PathChildrenCacheEvent.Type.CONNECTION_SUSPENDED ){
34             System.out.println("连接失效");
35         }else if(event.getType() == PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED ){
36             System.out.println("重新连接");
37         }else if(event.getType() == PathChildrenCacheEvent.Type.CONNECTION_LOST ){
38             System.out.println("连接失效后稍等一会儿执行");
39         }
40     }
41 });
42 System.in.read(); // 使线程阻塞

1.4. TreeCache

  • 介绍

    TreeCache有点像上面两种Cache的结合体,NodeCache能够监听自身节点的数据变化(或者是创建该节点),PathChildrenCache能够监听自身节点下的子节点的变化,而TreeCache既能够监听自身节点的变化、也能够监听子节点的变化。

  • 添加监听

 1 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,1);
 2 CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 1000, 1000, retryPolicy);
 3 client.start();
 4 TreeCache treeCache = new TreeCache(client,"/hello");
 5 treeCache.start();
 6 System.out.println(treeCache.getCurrentData("/hello"));
 7 treeCache.getListenable().addListener(new TreeCacheListener() {
 8     @Override
 9     public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
10             if(event.getType() == TreeCacheEvent.Type.NODE_ADDED){
11                 System.out.println(event.getData().getPath() + "节点添加");
12             }else if (event.getType() == TreeCacheEvent.Type.NODE_REMOVED){
13                 System.out.println(event.getData().getPath() + "节点移除");
14             }else if(event.getType() == TreeCacheEvent.Type.NODE_UPDATED){
15                 System.out.println(event.getData().getPath() + "节点修改");
16             }else if(event.getType() == TreeCacheEvent.Type.INITIALIZED){
17                 System.out.println("初始化完成");
18             }else if(event.getType() ==TreeCacheEvent.Type.CONNECTION_SUSPENDED){
19                 System.out.println("连接过时");
20             }else if(event.getType() ==TreeCacheEvent.Type.CONNECTION_RECONNECTED){
21                 System.out.println("重新连接");
22             }else if(event.getType() ==TreeCacheEvent.Type.CONNECTION_LOST){
23                 System.out.println("连接过时一段时间");
24             }
25     }
26 });
27 System.in.read();

【小结】

1:Zookeeper的数据结构(树型结构)

2:节点的分类(4个)

  • 持久性(带序号、不带序号)

  • 临时性(带序号、不带序号)

3:客户端命令(创建、查询、修改、删除)

4:Zookeeper的java的api介绍(创建、查询、修改、删除)

  • Curator的客户端

    RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,3);
    CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 3000, 3000, retryPolicy);

5:Zookeeper的watch机制

  • NodeCache

  • PathChildrenCache

  • TreeCache(监听和缓存根几点变化和子节点变化)(重点)