转载请注明:@ni掌柜
本文重点围绕ZooKeeper的Watcher,介绍通知的状态类型和事件类型,以及这些事件通知的触发条件。
1、浅谈Watcher接口
在ZooKeeper中,接口类Watcher定义了事件通知相关的逻辑,包含了KeeperState和EventType两个枚举类,分别代表通知状态和事件类型。还有一个比较重要的接口方法:
- abstract public void process(WatchedEvent event);
这个方法用于处理事件通知,每个实现类都应该自己实现合适的处理逻辑。参数WatchedEvent类封装了上面提到的两个枚举类,以及触发事件对应的ZK节点path,当然,这个path不一定每次通知都有,例如会话建立,会话失效或连接断开等通知类型,就不是针对某一个单独path的。
- ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
- public Stat exists(String path, boolean watch)throws KeeperException, InterruptedException
- public List<String> getChildren(String path, boolean watch)throws KeeperException,InterruptedException
- public byte[] getData(String path,boolean watch,Stat stat)throws KeeperException,InterruptedException
- public void register(Watcher watcher)
3、通知的状态类型与事件类型
在Watcher接口类中,已经定义了所有的状态类型和事件类型,这里把各个状态和事件类型之间的关系整理一下。
3.1状态:KeeperState.Disconnected(0)
此时客户端处于断开连接状态,和ZK集群都没有建立连接。
3.1.1事件:EventType.None(-1)
触发条件:一般是在与服务器断开连接的时候,客户端会收到这个事件。
3.2状态:KeeperState. SyncConnected(3)
3.2.1事件:EventType.None(-1)
触发条件:客户端与服务器成功建立会话之后,会收到这个通知。
3.2.2事件:EventType. NodeCreated (1)
触发条件:所关注的节点被创建。
3.2.3事件:EventType. NodeDeleted (2)
触发条件:所关注的节点被删除。
3.2.4事件:EventType. NodeDataChanged (3)
触发条件:所关注的节点的内容有更新。注意,这个地方说的内容是指数据的版本号dataVersion。因此,即使使用相同的数据内容来更新,还是会收到这个事件通知的。无论如何,调用了更新接口,就一定会更新dataVersion的。
3.2.5事件:EventType. NodeChildrenChanged (4)
触发条件:所关注的节点的子节点有变化。这里说的变化是指子节点的个数和组成,具体到子节点内容的变化是不会通知的。
3.3状态 KeeperState. AuthFailed(4)
3.3.1事件:EventType.None(-1)
3.4状态 KeeperState. Expired(-112)
3.4.1事件:EventType.None(-1)
具体代码如下:
package com.taobao.taokeeper.research.watcher;
import java.util.List;import java.util.concurrent.CountDownLatch;import java.util.concurrent.atomic.AtomicInteger;
import org.apache.log4j.PropertyConfigurator;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.Watcher.Event.EventType;import org.apache.zookeeper.Watcher.Event.KeeperState;import org.apache.zookeeper.ZooDefs.Ids;import org.apache.zookeeper.ZooKeeper;import org.apache.zookeeper.data.Stat;import org.slf4j.Logger;import org.slf4j.LoggerFactory;
import common.toolkit.java.util.ObjectUtil;import common.toolkit.java.util.ThreadUtil;
/** * 《ZooKeeper 事件类型详解》 * @author nileader/nileader@gmail.com * */public class AllZooKeeperWatcher implements Watcher{
private static final Logger LOG = LoggerFactory.getLogger( NodeDataChangedEvent.class ); AtomicInteger seq = new AtomicInteger(); private static final int SESSION_TIMEOUT = 10000; private static final String CONNECTION_STRING = "test.zookeeper.connection_string:2181," + "test.zookeeper.connection_string2:2181," + "test.zookeeper.connection_string3:2181"; private static final String ZK_PATH = "/nileader"; private static final String CHILDREN_PATH = "/nileader/ch"; private static final String LOG_PREFIX_OF_MAIN = "【Main】";
private ZooKeeper zk = null;
private CountDownLatch connectedSemaphore = new CountDownLatch( 1 );
/** * 创建ZK连接 * @param connectString ZK服务器地址列表 * @param sessionTimeout Session超时时间 */ public void createConnection( String connectString, int sessionTimeout ) { this.releaseConnection(); try { zk = new ZooKeeper( connectString, sessionTimeout,this ); LOG.info( LOG_PREFIX_OF_MAIN + "开始连接ZK服务器" ); connectedSemaphore.await(); } catch ( Exception e ) {} }
/** * 关闭ZK连接 */ public void releaseConnection() { if ( !ObjectUtil.isBlank( this.zk ) ) { try { this.zk.close(); } catch ( InterruptedException e ) {} } }
/** * 创建节点 * @param path 节点path * @param data 初始数据内容 * @return */ public boolean createPath( String path, String data ) { try { this.zk.exists( path, true ); LOG.info( LOG_PREFIX_OF_MAIN + "节点创建成功, Path: " + this.zk.create( path, // data.getBytes(), // Ids.OPEN_ACL_UNSAFE, // CreateMode.PERSISTENT ) + ", content: " + data ); } catch ( Exception e ) {} return true; }
/** * 读取指定节点数据内容 * @param path 节点path * @return */ public String readData( String path, boolean needWatch ) { try { return new String( this.zk.getData( path, needWatch, null ) ); } catch ( Exception e ) { return ""; } }
/** * 更新指定节点数据内容 * @param path 节点path * @param data 数据内容 * @return */ public boolean writeData( String path, String data ) { try { LOG.info( LOG_PREFIX_OF_MAIN + "更新数据成功,path:" + path + ", stat: " + this.zk.setData( path, data.getBytes(), -1 ) ); } catch ( Exception e ) {} return false; }
/** * 删除指定节点 * @param path 节点path */ public void deleteNode( String path ) { try { this.zk.delete( path, -1 ); LOG.info( LOG_PREFIX_OF_MAIN + "删除节点成功,path:" + path ); } catch ( Exception e ) { //TODO } }
/** * 删除指定节点 * @param path 节点path */ public Stat exists( String path, boolean needWatch ) { try { return this.zk.exists( path, needWatch ); } catch ( Exception e ) {return null;} }
/** * 获取子节点 * @param path 节点path */ private List<String> getChildren( String path, boolean needWatch ) { try { return this.zk.getChildren( path, needWatch ); } catch ( Exception e ) {return null;} }
public void deleteAllTestPath(){ this.deleteNode( CHILDREN_PATH ); this.deleteNode( ZK_PATH ); }
public static void main( String[] args ) {
PropertyConfigurator.configure("src/main/resources/log4j.properties");
AllZooKeeperWatcher sample = new AllZooKeeperWatcher(); sample.createConnection( CONNECTION_STRING, SESSION_TIMEOUT ); //清理节点 sample.deleteAllTestPath(); if ( sample.createPath( ZK_PATH, System.currentTimeMillis()+"" ) ) { ThreadUtil.sleep( 3000 ); //读取数据 sample.readData( ZK_PATH, true ); //读取子节点 sample.getChildren( ZK_PATH, true );
//更新数据 sample.writeData( ZK_PATH, System.currentTimeMillis()+"" ); ThreadUtil.sleep( 3000 ); //创建子节点 sample.createPath( CHILDREN_PATH, System.currentTimeMillis()+"" ); } ThreadUtil.sleep( 3000 ); //清理节点 sample.deleteAllTestPath(); ThreadUtil.sleep( 3000 ); sample.releaseConnection(); }
/** * 收到来自Server的Watcher通知后的处理。 */ @Override public void process( WatchedEvent event ) {
ThreadUtil.sleep( 200 ); if ( ObjectUtil.isBlank( event ) ) { return; } // 连接状态 KeeperState keeperState = event.getState(); // 事件类型 EventType eventType = event.getType(); // 受影响的path String path = event.getPath(); String logPrefix = "【Watcher-" + this.seq.incrementAndGet() + "】";
LOG.info( logPrefix + "收到Watcher通知" ); LOG.info( logPrefix + "连接状态:\t" + keeperState.toString() ); LOG.info( logPrefix + "事件类型:\t" + eventType.toString() );
if ( KeeperState.SyncConnected == keeperState ) { // 成功连接上ZK服务器 if ( EventType.None == eventType ) { LOG.info( logPrefix + "成功连接上ZK服务器" ); connectedSemaphore.countDown(); } else if ( EventType.NodeCreated == eventType ) { LOG.info( logPrefix + "节点创建" ); this.exists( path, true ); } else if ( EventType.NodeDataChanged == eventType ) { LOG.info( logPrefix + "节点数据更新" ); LOG.info( logPrefix + "数据内容: " + this.readData( ZK_PATH, true ) ); } else if ( EventType.NodeChildrenChanged == eventType ) { LOG.info( logPrefix + "子节点变更" ); LOG.info( logPrefix + "子节点列表:" + this.getChildren( ZK_PATH, true ) ); } else if ( EventType.NodeDeleted == eventType ) { LOG.info( logPrefix + "节点 " + path + " 被删除" ); }
} else if ( KeeperState.Disconnected == keeperState ) { LOG.info( logPrefix + "与ZK服务器断开连接" ); } else if ( KeeperState.AuthFailed == keeperState ) { LOG.info( logPrefix + "权限检查失败" ); } else if ( KeeperState.Expired == keeperState ) { LOG.info( logPrefix + "会话失效" ); }
LOG.info( "--------------------------------------------" );
}
}
4、程序实例
这里有一个可以用来演示“触发事件通知”和“如何处理这些事件通知”的程序AllZooKeeperWatcher.java。
在这里:https://github.com/alibaba/taokeeper/blob/master/taokeeper-research/src/main/java/com/taobao/taokeeper/research/watcher/AllZooKeeperWatcher.java
运行结果如下:
- 2012-08-05 06:35:23,779 - 【Main】开始连接ZK服务器
- 2012-08-05 06:35:24,196 - 【Watcher-1】收到Watcher通知
- 2012-08-05 06:35:24,196 - 【Watcher-1】连接状态: SyncConnected
- 2012-08-05 06:35:24,196 - 【Watcher-1】事件类型: None
- 2012-08-05 06:35:24,196 - 【Watcher-1】成功连接上ZK服务器
- 2012-08-05 06:35:24,196 - --------------------------------------------
- 2012-08-05 06:35:24,354 - 【Main】节点创建成功, Path: /nileader, content: 1353337464279
- 2012-08-05 06:35:24,554 - 【Watcher-2】收到Watcher通知
- 2012-08-05 06:35:24,554 - 【Watcher-2】连接状态: SyncConnected
- 2012-08-05 06:35:24,554 - 【Watcher-2】事件类型: NodeCreated
- 2012-08-05 06:35:24,554 - 【Watcher-2】节点创建
- 2012-08-05 06:35:24,582 - --------------------------------------------
- 2012-08-05 06:35:27,471 - 【Main】更新数据成功,path:/nileader,
- 2012-08-05 06:35:27,667 - 【Watcher-3】收到Watcher通知
- 2012-08-05 06:35:27,667 - 【Watcher-3】连接状态: SyncConnected
- 2012-08-05 06:35:27,667 - 【Watcher-3】事件类型: NodeDataChanged
- 2012-08-05 06:35:27,667 - 【Watcher-3】节点数据更新
- 2012-08-05 06:35:27,696 - 【Watcher-3】数据内容: 1353337467434
- 2012-08-05 06:35:27,696 - --------------------------------------------
- 2012-08-05 06:35:30,534 - 【Main】节点创建成功, Path: /nileader/ch, content: 1353337470471
- 2012-08-05 06:35:30,728 - 【Watcher-4】收到Watcher通知
- 2012-08-05 06:35:30,728 - 【Watcher-4】连接状态: SyncConnected
- 2012-08-05 06:35:30,728 - 【Watcher-4】事件类型: NodeCreated
- 2012-08-05 06:35:30,728 - 【Watcher-4】节点创建
- 2012-08-05 06:35:30,758 - --------------------------------------------
- 2012-08-05 06:35:30,958 - 【Watcher-5】收到Watcher通知
- 2012-08-05 06:35:30,958 - 【Watcher-5】连接状态: SyncConnected
- 2012-08-05 06:35:30,958 - 【Watcher-5】事件类型: NodeChildrenChanged
- 2012-08-05 06:35:30,958 - 【Watcher-5】子节点变更
- 2012-08-05 06:35:30,993 - 【Watcher-5】子节点列表:[ch]
- 2012-08-05 06:35:30,993 - --------------------------------------------
- 2012-08-05 06:35:33,618 - 【Main】删除节点成功,path:/nileader/ch
- 2012-08-05 06:35:33,756 - 【Main】删除节点成功,path:/nileader
- 2012-08-05 06:35:33,817 - 【Watcher-6】收到Watcher通知
- 2012-08-05 06:35:33,817 - 【Watcher-6】连接状态: SyncConnected
- 2012-08-05 06:35:33,817 - 【Watcher-6】事件类型: NodeDeleted
- 2012-08-05 06:35:33,817 - 【Watcher-6】节点 /nileader/ch 被删除
- 2012-08-05 06:35:33,817 - --------------------------------------------
- 2012-08-05 06:35:34,017 - 【Watcher-7】收到Watcher通知
- 2012-08-05 06:35:34,017 - 【Watcher-7】连接状态: SyncConnected
- 2012-08-05 06:35:34,017 - 【Watcher-7】事件类型: NodeChildrenChanged
- 2012-08-05 06:35:34,017 - 【Watcher-7】子节点变更
- 2012-08-05 06:35:34,109 - 【Watcher-7】子节点列表:null
- 2012-08-05 06:35:34,109 - --------------------------------------------
- 2012-08-05 06:35:34,309 - 【Watcher-8】收到Watcher通知
- 2012-08-05 06:35:34,309 - 【Watcher-8】连接状态: SyncConnected
- 2012-08-05 06:35:34,309 - 【Watcher-8】事件类型: NodeDeleted
- 2012-08-05 06:35:34,309 - 【Watcher-8】节点 /nileader 被删除
- 2012-08-05 06:35:34,309 - --------------------------------------------