现在的位置: 首页 > 综合 > 正文

zookeeper动态通知实现

2014年10月09日 ⁄ 综合 ⁄ 共 9888字 ⁄ 字号 评论关闭

转载请注明:@ni掌柜

    本文重点围绕ZooKeeper的Watcher,介绍通知的状态类型和事件类型,以及这些事件通知的触发条件。

 

1、浅谈Watcher接口

在ZooKeeper中,接口类Watcher定义了事件通知相关的逻辑,包含了KeeperState和EventType两个枚举类,分别代表通知状态和事件类型。还有一个比较重要的接口方法:

  1. abstract public void process(WatchedEvent event); 

这个方法用于处理事件通知,每个实现类都应该自己实现合适的处理逻辑。参数WatchedEvent类封装了上面提到的两个枚举类,以及触发事件对应的ZK节点path,当然,这个path不一定每次通知都有,例如会话建立,会话失效或连接断开等通知类型,就不是针对某一个单独path的。

2、如何注册Watcher
上面已经提到,Watcher接口已经提供了基本的回调方法用于处理来自服务器的通知。因此,我们只要在合适的地方实现这个接口,并传给服务器即可。下面来看看哪些是合适的地方: 
A、构造方法
  1. ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) 
上面这个是ZooKeeper的一个构造方法,与ZK创建连接的时候会用到这个。这里我们重点关注第三个参数:Watcher,很显然在,这个就是一个注册Watcher的地方,传入的参数就是开发者自己Watcher接口实现。需要注意的是,这个地方注册的Watcher实现,会成为当前ZK会话的默认Watcher实现。也就是说,其它地方如果也想注册一个Watcher,那么是可以默认使用这个实现的。具体下面会涉及到。
B、API的读写接口中
  1. public Stat exists(String path, boolean watch)throws KeeperException, InterruptedException 
  2.  
  3. public List<String> getChildren(String path, boolean watch)throws KeeperException,InterruptedException 
  4.  
  5. public byte[] getData(String path,boolean watch,Stat stat)throws KeeperException,InterruptedException 
  6.  
  7. public void register(Watcher watcher) 

 

 

3、通知的状态类型与事件类型

在Watcher接口类中,已经定义了所有的状态类型和事件类型,这里把各个状态和事件类型之间的关系整理一下。

3.1状态:KeeperState.Disconnected(0)

此时客户端处于断开连接状态,和ZK集群都没有建立连接。

3.1.1事件:EventType.None(-1)

触发条件:一般是在与服务器断开连接的时候,客户端会收到这个事件。

 

3.2状态:KeeperState. SyncConnected(3)

3.2.1事件:EventType.None(-1)

触发条件:客户端与服务器成功建立会话之后,会收到这个通知。

3.2.2事件:EventType. NodeCreated (1)

触发条件:所关注的节点被创建。

3.2.3事件:EventType. NodeDeleted (2)

触发条件:所关注的节点被删除。

3.2.4事件:EventType. NodeDataChanged (3)

触发条件:所关注的节点的内容有更新。注意,这个地方说的内容是指数据的版本号dataVersion。因此,即使使用相同的数据内容来更新,还是会收到这个事件通知的。无论如何,调用了更新接口,就一定会更新dataVersion的。

3.2.5事件:EventType. NodeChildrenChanged (4)

触发条件:所关注的节点的子节点有变化。这里说的变化是指子节点的个数和组成,具体到子节点内容的变化是不会通知的。

 

3.3状态 KeeperState. AuthFailed(4)

3.3.1事件:EventType.None(-1)

 

3.4状态 KeeperState. Expired(-112)

3.4.1事件:EventType.None(-1)

 

具体代码如下:

package com.taobao.taokeeper.research.watcher;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.log4j.PropertyConfigurator;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import common.toolkit.java.util.ObjectUtil;
import common.toolkit.java.util.ThreadUtil;
/**
* 《ZooKeeper 事件类型详解》
* @author nileader/nileader@gmail.com
*
*/
public class AllZooKeeperWatcher implements Watcher{
private static final Logger LOG = LoggerFactory.getLogger( NodeDataChangedEvent.class );
AtomicInteger seq = new AtomicInteger();
private static final int SESSION_TIMEOUT = 10000;
private static final String CONNECTION_STRING = "test.zookeeper.connection_string:2181," +
"test.zookeeper.connection_string2:2181," +
"test.zookeeper.connection_string3:2181";
private static final String ZK_PATH = "/nileader";
private static final String CHILDREN_PATH = "/nileader/ch";
private static final String LOG_PREFIX_OF_MAIN = "【Main】";
private ZooKeeper zk = null;
private CountDownLatch connectedSemaphore = new CountDownLatch( 1 );
/**
* 创建ZK连接
* @param connectString ZK服务器地址列表
* @param sessionTimeout Session超时时间
*/
public void createConnection( String connectString, int sessionTimeout ) {
this.releaseConnection();
try {
zk = new ZooKeeper( connectString, sessionTimeout,this );
LOG.info( LOG_PREFIX_OF_MAIN + "开始连接ZK服务器" );
connectedSemaphore.await();
} catch ( Exception e ) {}
}
/**
* 关闭ZK连接
*/
public void releaseConnection() {
if ( !ObjectUtil.isBlank( this.zk ) ) {
try {
this.zk.close();
} catch ( InterruptedException e ) {}
}
}
/**
* 创建节点
* @param path 节点path
* @param data 初始数据内容
* @return
*/
public boolean createPath( String path, String data ) {
try {
this.zk.exists( path, true );
LOG.info( LOG_PREFIX_OF_MAIN + "节点创建成功, Path: "
+ this.zk.create( path, //
data.getBytes(), //
Ids.OPEN_ACL_UNSAFE, //
CreateMode.PERSISTENT )
+ ", content: " + data );
} catch ( Exception e ) {}
return true;
}
/**
* 读取指定节点数据内容
* @param path 节点path
* @return
*/
public String readData( String path, boolean needWatch ) {
try {
return new String( this.zk.getData( path, needWatch, null ) );
} catch ( Exception e ) {
return "";
}
}
/**
* 更新指定节点数据内容
* @param path 节点path
* @param data 数据内容
* @return
*/
public boolean writeData( String path, String data ) {
try {
LOG.info( LOG_PREFIX_OF_MAIN + "更新数据成功,path:" + path + ", stat: " +
this.zk.setData( path, data.getBytes(), -1 ) );
} catch ( Exception e ) {}
return false;
}
/**
* 删除指定节点
* @param path 节点path
*/
public void deleteNode( String path ) {
try {
this.zk.delete( path, -1 );
LOG.info( LOG_PREFIX_OF_MAIN + "删除节点成功,path:" + path );
} catch ( Exception e ) {
//TODO
}
}
/**
* 删除指定节点
* @param path 节点path
*/
public Stat exists( String path, boolean needWatch ) {
try {
return this.zk.exists( path, needWatch );
} catch ( Exception e ) {return null;}
}
/**
* 获取子节点
* @param path 节点path
*/
private List<String> getChildren( String path, boolean needWatch ) {
try {
return this.zk.getChildren( path, needWatch );
} catch ( Exception e ) {return null;}
}
public void deleteAllTestPath(){
this.deleteNode( CHILDREN_PATH );
this.deleteNode( ZK_PATH );
}
public static void main( String[] args ) {
PropertyConfigurator.configure("src/main/resources/log4j.properties");
AllZooKeeperWatcher sample = new AllZooKeeperWatcher();
sample.createConnection( CONNECTION_STRING, SESSION_TIMEOUT );
//清理节点
sample.deleteAllTestPath();
if ( sample.createPath( ZK_PATH, System.currentTimeMillis()+"" ) ) {
ThreadUtil.sleep( 3000 );
//读取数据
sample.readData( ZK_PATH, true );
//读取子节点
sample.getChildren( ZK_PATH, true );
//更新数据
sample.writeData( ZK_PATH, System.currentTimeMillis()+"" );
ThreadUtil.sleep( 3000 );
//创建子节点
sample.createPath( CHILDREN_PATH, System.currentTimeMillis()+"" );
}
ThreadUtil.sleep( 3000 );
//清理节点
sample.deleteAllTestPath();
ThreadUtil.sleep( 3000 );
sample.releaseConnection();
}
/**
* 收到来自Server的Watcher通知后的处理。
*/
@Override
public void process( WatchedEvent event ) {
ThreadUtil.sleep( 200 );
if ( ObjectUtil.isBlank( event ) ) {
return;
}
// 连接状态
KeeperState keeperState = event.getState();
// 事件类型
EventType eventType = event.getType();
// 受影响的path
String path = event.getPath();
String logPrefix = "【Watcher-" + this.seq.incrementAndGet() + "】";
LOG.info( logPrefix + "收到Watcher通知" );
LOG.info( logPrefix + "连接状态:\t" + keeperState.toString() );
LOG.info( logPrefix + "事件类型:\t" + eventType.toString() );
if ( KeeperState.SyncConnected == keeperState ) {
// 成功连接上ZK服务器
if ( EventType.None == eventType ) {
LOG.info( logPrefix + "成功连接上ZK服务器" );
connectedSemaphore.countDown();
} else if ( EventType.NodeCreated == eventType ) {
LOG.info( logPrefix + "节点创建" );
this.exists( path, true );
} else if ( EventType.NodeDataChanged == eventType ) {
LOG.info( logPrefix + "节点数据更新" );
LOG.info( logPrefix + "数据内容: " + this.readData( ZK_PATH, true ) );
} else if ( EventType.NodeChildrenChanged == eventType ) {
LOG.info( logPrefix + "子节点变更" );
LOG.info( logPrefix + "子节点列表:" + this.getChildren( ZK_PATH, true ) );
} else if ( EventType.NodeDeleted == eventType ) {
LOG.info( logPrefix + "节点 " + path + " 被删除" );
}
} else if ( KeeperState.Disconnected == keeperState ) {
LOG.info( logPrefix + "与ZK服务器断开连接" );
} else if ( KeeperState.AuthFailed == keeperState ) {
LOG.info( logPrefix + "权限检查失败" );
} else if ( KeeperState.Expired == keeperState ) {
LOG.info( logPrefix + "会话失效" );
}
LOG.info( "--------------------------------------------" );
}
}

 

4、程序实例

这里有一个可以用来演示“触发事件通知”和“如何处理这些事件通知”的程序AllZooKeeperWatcher.java。

在这里:https://github.com/alibaba/taokeeper/blob/master/taokeeper-research/src/main/java/com/taobao/taokeeper/research/watcher/AllZooKeeperWatcher.java

运行结果如下:

  1. 2012-08-05 06:35:23,779 - 【Main】开始连接ZK服务器 
  2. 2012-08-05 06:35:24,196 - 【Watcher-1】收到Watcher通知 
  3. 2012-08-05 06:35:24,196 - 【Watcher-1】连接状态:  SyncConnected 
  4. 2012-08-05 06:35:24,196 - 【Watcher-1】事件类型:  None 
  5. 2012-08-05 06:35:24,196 - 【Watcher-1】成功连接上ZK服务器 
  6. 2012-08-05 06:35:24,196 - -------------------------------------------- 
  7. 2012-08-05 06:35:24,354 - 【Main】节点创建成功, Path: /nileader, content: 1353337464279 
  8. 2012-08-05 06:35:24,554 - 【Watcher-2】收到Watcher通知 
  9. 2012-08-05 06:35:24,554 - 【Watcher-2】连接状态:  SyncConnected 
  10. 2012-08-05 06:35:24,554 - 【Watcher-2】事件类型:  NodeCreated 
  11. 2012-08-05 06:35:24,554 - 【Watcher-2】节点创建 
  12. 2012-08-05 06:35:24,582 - -------------------------------------------- 
  13. 2012-08-05 06:35:27,471 - 【Main】更新数据成功,path:/nileader,  
  14.  
  15. 2012-08-05 06:35:27,667 - 【Watcher-3】收到Watcher通知 
  16. 2012-08-05 06:35:27,667 - 【Watcher-3】连接状态:  SyncConnected 
  17. 2012-08-05 06:35:27,667 - 【Watcher-3】事件类型:  NodeDataChanged 
  18. 2012-08-05 06:35:27,667 - 【Watcher-3】节点数据更新 
  19. 2012-08-05 06:35:27,696 - 【Watcher-3】数据内容: 1353337467434 
  20. 2012-08-05 06:35:27,696 - -------------------------------------------- 
  21. 2012-08-05 06:35:30,534 - 【Main】节点创建成功, Path: /nileader/ch, content: 1353337470471 
  22. 2012-08-05 06:35:30,728 - 【Watcher-4】收到Watcher通知 
  23. 2012-08-05 06:35:30,728 - 【Watcher-4】连接状态:  SyncConnected 
  24. 2012-08-05 06:35:30,728 - 【Watcher-4】事件类型:  NodeCreated 
  25. 2012-08-05 06:35:30,728 - 【Watcher-4】节点创建 
  26. 2012-08-05 06:35:30,758 - -------------------------------------------- 
  27. 2012-08-05 06:35:30,958 - 【Watcher-5】收到Watcher通知 
  28. 2012-08-05 06:35:30,958 - 【Watcher-5】连接状态:  SyncConnected 
  29. 2012-08-05 06:35:30,958 - 【Watcher-5】事件类型:  NodeChildrenChanged 
  30. 2012-08-05 06:35:30,958 - 【Watcher-5】子节点变更 
  31. 2012-08-05 06:35:30,993 - 【Watcher-5】子节点列表:[ch] 
  32. 2012-08-05 06:35:30,993 - -------------------------------------------- 
  33. 2012-08-05 06:35:33,618 - 【Main】删除节点成功,path:/nileader/ch 
  34. 2012-08-05 06:35:33,756 - 【Main】删除节点成功,path:/nileader 
  35. 2012-08-05 06:35:33,817 - 【Watcher-6】收到Watcher通知 
  36. 2012-08-05 06:35:33,817 - 【Watcher-6】连接状态:  SyncConnected 
  37. 2012-08-05 06:35:33,817 - 【Watcher-6】事件类型:  NodeDeleted 
  38. 2012-08-05 06:35:33,817 - 【Watcher-6】节点 /nileader/ch 被删除 
  39. 2012-08-05 06:35:33,817 - -------------------------------------------- 
  40. 2012-08-05 06:35:34,017 - 【Watcher-7】收到Watcher通知 
  41. 2012-08-05 06:35:34,017 - 【Watcher-7】连接状态:  SyncConnected 
  42. 2012-08-05 06:35:34,017 - 【Watcher-7】事件类型:  NodeChildrenChanged 
  43. 2012-08-05 06:35:34,017 - 【Watcher-7】子节点变更 
  44. 2012-08-05 06:35:34,109 - 【Watcher-7】子节点列表:null 
  45. 2012-08-05 06:35:34,109 - -------------------------------------------- 
  46. 2012-08-05 06:35:34,309 - 【Watcher-8】收到Watcher通知 
  47. 2012-08-05 06:35:34,309 - 【Watcher-8】连接状态:  SyncConnected 
  48. 2012-08-05 06:35:34,309 - 【Watcher-8】事件类型:  NodeDeleted 
  49. 2012-08-05 06:35:34,309 - 【Watcher-8】节点 /nileader 被删除 
  50. 2012-08-05 06:35:34,309 - -------------------------------------------- 
 

抱歉!评论已关闭.