Zookeeper的watch机制
Zookeeper的应用场景中配置中心,其中看到watch机制
zookeeper作为一款成熟的分布式协调框架,订阅-发布功能是很重要的一个。所谓订阅功能,其实说白了就是观察者模式。观察者会订阅一些感兴趣的主题,然后这些主题一旦变化了,就会自动通知到这些观察者。
zookeeper的订阅发布也就是watch机制,是一个轻量级的设计。因为它采用了一种推拉结合的模式。一旦服务端感知主题变了,那么只会发送一个事件类型和节点信息给关注的客户端,而不会包括具体的变更内容,所以事件本身是轻量级的,这就是所谓的“推”部分。然后,收到变更通知的客户端需要自己去拉变更的数据,这就是“拉”部分。watche机制分为添加数据和监听节点。
Curator在这方面做了优化,Curator引入了Cache的概念用来实现对ZooKeeper服务器端进行事件监听。Cache是Curator对事件监听的包装,其对事件的监听可以近似看做是一个本地缓存视图和远程ZooKeeper视图的对比过程。而且Curator会自动的再次监听,我们就不需要自己手动的重复监听了。
Curator中的cache共有三种
-
NodeCache(监听和缓存根节点变化)
-
PathChildrenCache(监听和缓存子节点变化)
-
TreeCache(监听和缓存根节点变化和子节点变化)
下面我们分别对三种cache详解
-
NodeCache是用来监听节点的数据变化的,当监听的节点的数据发生变化的时候就会回调对应的函数。
-
1 //创建重试策略
2 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,1);
3 //创建客户端
4 CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 1000, 1000, retryPolicy);
5 //开启客户端
6 client.start();
7 System.out.println("连接成功");
8 //创建节点数据监听对象
9 final NodeCache nodeCache = new NodeCache(client, "/hello");
10 //开始缓存
11 /**
12 * 参数为true:可以直接获取监听的节点,System.out.println(nodeCache.getCurrentData());为ChildData{path='/aa', stat=607,765,1580205779732,1580973376268,2,1,0,0,5,1,608
13 , data=[97, 98, 99, 100, 101]}
14 * 参数为false:不可以获取监听的节点,System.out.println(nodeCache.getCurrentData());为null
15 */
16 nodeCache.start(true);
17 System.out.println(nodeCache.getCurrentData());
18 //添加监听对象
19 nodeCache.getListenable().addListener(new NodeCacheListener() {
20 //如果节点数据有变化,会回调该方法
21 public void nodeChanged() throws Exception {
22 String data = new String(nodeCache.getCurrentData().getData());
23 System.out.println("数据Watcher:路径=" + nodeCache.getCurrentData().getPath()
24 + ":data=" + data);
25 }
26 });
27 System.in.read();
测试
修改节点数据
控制台显示
-
PathChildrenCache是用来监听指定节点 的子节点变化情况
-
1 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,1);
2 CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 1000, 1000, retryPolicy);
3 client.start();
4 //监听指定节点的子节点变化情况包括?新增子节点 子节点数据变更 和子节点删除
5 //true表示用于配置是否把节点内容缓存起来,如果配置为true,客户端在接收到节点列表变更的同时,也能够获取到节点的数据内容(即:event.getData().getData())?如果为false 则无法取到数据内容(即:event.getData().getData())
6 PathChildrenCache childrenCache = new PathChildrenCache(client,"/hello",true);
7 /**
8 * NORMAL: 普通启动方式, 在启动时缓存子节点数据
9 * POST_INITIALIZED_EVENT:在启动时缓存子节点数据,提示初始化
10 * BUILD_INITIAL_CACHE: 在启动时什么都不会输出
11 * 在官方解释中说是因为这种模式会在start执行执行之前先执行rebuild的方法,而rebuild的方法不会发出任何事件通知。
12 */
13 childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
14 System.out.println(childrenCache.getCurrentData());
15 //添加监听
16 childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
17 @Override
18 public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
19 if(event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED){
20 System.out.println("子节点更新");
21 System.out.println("节点:"+event.getData().getPath());
22 System.out.println("数据" + new String(event.getData().getData()));
23 }else if(event.getType() == PathChildrenCacheEvent.Type.INITIALIZED ){
24 System.out.println("初始化操作");
25 }else if(event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED ){
26 System.out.println("删除子节点");
27 System.out.println("节点:"+event.getData().getPath());
28 System.out.println("数据" + new String(event.getData().getData()));
29 }else if(event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED ){
30 System.out.println("添加子节点");
31 System.out.println("节点:"+event.getData().getPath());
32 System.out.println("数据" + new String(event.getData().getData()));
33 }else if(event.getType() == PathChildrenCacheEvent.Type.CONNECTION_SUSPENDED ){
34 System.out.println("连接失效");
35 }else if(event.getType() == PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED ){
36 System.out.println("重新连接");
37 }else if(event.getType() == PathChildrenCacheEvent.Type.CONNECTION_LOST ){
38 System.out.println("连接失效后稍等一会儿执行");
39 }
40 }
41 });
42 System.in.read(); // 使线程阻塞
-
TreeCache有点像上面两种Cache的结合体,NodeCache能够监听自身节点的数据变化(或者是创建该节点),PathChildrenCache能够监听自身节点下的子节点的变化,而TreeCache既能够监听自身节点的变化、也能够监听子节点的变化。
-
1 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,1);
2 CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 1000, 1000, retryPolicy);
3 client.start();
4 TreeCache treeCache = new TreeCache(client,"/hello");
5 treeCache.start();
6 System.out.println(treeCache.getCurrentData("/hello"));
7 treeCache.getListenable().addListener(new TreeCacheListener() {
8 @Override
9 public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
10 if(event.getType() == TreeCacheEvent.Type.NODE_ADDED){
11 System.out.println(event.getData().getPath() + "节点添加");
12 }else if (event.getType() == TreeCacheEvent.Type.NODE_REMOVED){
13 System.out.println(event.getData().getPath() + "节点移除");
14 }else if(event.getType() == TreeCacheEvent.Type.NODE_UPDATED){
15 System.out.println(event.getData().getPath() + "节点修改");
16 }else if(event.getType() == TreeCacheEvent.Type.INITIALIZED){
17 System.out.println("初始化完成");
18 }else if(event.getType() ==TreeCacheEvent.Type.CONNECTION_SUSPENDED){
19 System.out.println("连接过时");
20 }else if(event.getType() ==TreeCacheEvent.Type.CONNECTION_RECONNECTED){
21 System.out.println("重新连接");
22 }else if(event.getType() ==TreeCacheEvent.Type.CONNECTION_LOST){
23 System.out.println("连接过时一段时间");
24 }
25 }
26 });
27 System.in.read();
1:Zookeeper的数据结构(树型结构)
2:节点的分类(4个)
-
持久性(带序号、不带序号)
-
临时性(带序号、不带序号)
3:客户端命令(创建、查询、修改、删除)
4:Zookeeper的java的api介绍(创建、查询、修改、删除)
-
Curator的客户端
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,3);
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 3000, 3000, retryPolicy);
5:Zookeeper的watch机制
-
NodeCache
-
PathChildrenCache
-
TreeCache(监听和缓存根几点变化和子节点变化)(重点)