现在的位置: 首页 > 综合 > 正文

Zookeeper(七)分布式锁

2013年10月10日 ⁄ 综合 ⁄ 共 4091字 ⁄ 字号 评论关闭

获取锁实现思路:

1.     首先创建一个作为锁目录(znode),通常用它来描述锁定的实体,称为:/lock_node

2.     希望获得锁的客户端在锁目录下创建znode,作为锁/lock_node的子节点,并且节点类型为有序临时节点(EPHEMERAL_SEQUENTIAL);

        例如:有两个客户端创建znode,分别为/lock_node/lock-1和/lock_node/lock-2

3.     当前客户端调用getChildren(/lock_node)得到锁目录所有子节点,不设置watch,接着获取小于自己(步骤2创建)的兄弟节点

4.     步骤3中获取小于自己的节点不存在 && 最小节点与步骤2中创建的相同,说明当前客户端顺序号最小,获得锁,结束。

5.     客户端监视(watch)相对自己次小的有序临时节点状态

6.     如果监视的次小节点状态发生变化,则跳转到步骤3,继续后续操作,直到退出锁竞争。

 public synchronized boolean lock() throws KeeperException, InterruptedException {
        if (isClosed()) {
            return false;
        }
        // 如果锁目录不存在, 创建锁目录   节点类型为永久类型
        ensurePathExists(dir);

        // 创建锁节点,节点类型EPHEMERAL_SEQUENTIAL 
        // 如果不存在小于自己的节点   并且最小节点 与当前创建的节点相同  获得锁
        // 未获得成功,对当前次小节点设置watch
        return (Boolean) retryOperation(zop);
    }

创建锁目录

    protected void ensurePathExists(String path) {
        ensureExists(path, null, acl, CreateMode.PERSISTENT);
    }
    protected void ensureExists(final String path, final byte[] data,
            final List<ACL> acl, final CreateMode flags) {
        try {
            retryOperation(new ZooKeeperOperation() {
                public boolean execute() throws KeeperException, InterruptedException {
                	// 创建锁目录
                    Stat stat = zookeeper.exists(path, false);
                    // 节点如果存在  直接返回
                    if (stat != null) {
                        return true;
                    }
                    // 创建节点
                    // data为null
                    // flags为持久化节点
                    zookeeper.create(path, data, acl, flags);
                    return true;
                }
            });
        } catch (KeeperException e) {
            LOG.warn("Caught: " + e, e);
        } catch (InterruptedException e) {
            LOG.warn("Caught: " + e, e);
        }
    }

创建锁节点,获得锁目录下的所有节点, 如果为最小节点 获得锁成功

        /**
         * the command that is run and retried for actually 
         * obtaining the lock
         * @return if the command was successful or not
         */
        public boolean execute() throws KeeperException, InterruptedException {
            do {
                if (id == null) {
                    long sessionId = zookeeper.getSessionId();
                    String prefix = "x-" + sessionId + "-";
                    // lets try look up the current ID if we failed 
                    // in the middle of creating the znode
                    findPrefixInChildren(prefix, zookeeper, dir);
                    idName = new ZNodeName(id);
                }
                if (id != null) {
                    List<String> names = zookeeper.getChildren(dir, false);
                    if (names.isEmpty()) {
                        LOG.warn("No children in: " + dir + " when we've just " +
                        "created one! Lets recreate it...");
                        // lets force the recreation of the id
                        id = null;
                    } else {
                        // lets sort them explicitly (though they do seem to come back in order ususally :) 
                        SortedSet<ZNodeName> sortedNames = new TreeSet<ZNodeName>();
                        for (String name : names) {
                            sortedNames.add(new ZNodeName(dir + "/" + name));
                        }
                        // 获得最小节点
                        ownerId = sortedNames.first().getName();
                        // lock_1, lock_2, lock_3  传入参数lock_2  返回lock_1
                        SortedSet<ZNodeName> lessThanMe = sortedNames.headSet(idName);
                        if (!lessThanMe.isEmpty()) {
                            ZNodeName lastChildName = lessThanMe.last();
                            lastChildId = lastChildName.getName();
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("watching less than me node: " + lastChildId);
                            }
                            // 次小节点设置watch 
                            Stat stat = zookeeper.exists(lastChildId, new LockWatcher());
                            if (stat != null) {
                                return Boolean.FALSE;
                            } else {
                                LOG.warn("Could not find the" +
                                		" stats for less than me: " + lastChildName.getName());
                            }
                        } else {
                        	// 锁目录下的最小节点  与当前客户端创建相同
                            if (isOwner()) {
                                if (callback != null) {
                                    callback.lockAcquired();
                                }
                                // 获得锁
                                return Boolean.TRUE;
                            }
                        }
                    }
                }
            }
            while (id == null);
            return Boolean.FALSE;
        }
    };

private void findPrefixInChildren(String prefix, ZooKeeper zookeeper, String dir) 
            throws KeeperException, InterruptedException {
        	// 获取锁目录下的所有子节点
            List<String> names = zookeeper.getChildren(dir, false);
            for (String name : names) {
            	//x-sessionId-
                if (name.startsWith(prefix)) {
                    id = name;
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Found id created last time: " + id);
                    }
                    break;
                }
            }
            // 当前锁目录下   没有与当前会话对应的子节点    创建子节点  节点类型为临时顺序节点
            if (id == null) {
            	// dir/x-sessionId-i
                id = zookeeper.create(dir + "/" + prefix, data, 
                        getAcl(), EPHEMERAL_SEQUENTIAL);

                if (LOG.isDebugEnabled()) {
                    LOG.debug("Created id: " + id);
                }
            }

释放锁:

释放锁非常简单,删除步骤1中创建的有序临时节点。另外,如果客户端进程死亡或连接失效,对应的节点也会被删除。

 public synchronized void unlock() throws RuntimeException {
        
        if (!isClosed() && id != null) {
            // we don't need to retry this operation in the case of failure
            // as ZK will remove ephemeral files and we don't wanna hang
            // this process when closing if we cannot reconnect to ZK
            try {
                
                ZooKeeperOperation zopdel = new ZooKeeperOperation() {
                    public boolean execute() throws KeeperException,
                        InterruptedException {
                    	// 删除节点  忽略版本
                        zookeeper.delete(id, -1);   
                        return Boolean.TRUE;
                    }
                };
                zopdel.execute();
            } catch (InterruptedException e) {
                LOG.warn("Caught: " + e, e);
                //set that we have been interrupted.
               Thread.currentThread().interrupt();
            } catch (KeeperException.NoNodeException e) {
                // do nothing
            } catch (KeeperException e) {
                LOG.warn("Caught: " + e, e);
                throw (RuntimeException) new RuntimeException(e.getMessage()).
                    initCause(e);
            }
            finally {
                if (callback != null) {
                    callback.lockReleased();
                }
                id = null;
            }
        }
    }

抱歉!评论已关闭.