基于 ZooKeeper 的分布式锁和队列( 下 )

(点击上方公众号,可快速关注)

来源:阿凡卢,

www.cnblogs.com/luxiaoxun/p/4889764.html

如有好文章投稿,请点击 → 这里了解详情

Zookeeper统一操作ZooKeeperOperation接口:

public interface ZooKeeperOperation {

     

    /**

     * Performs the operation - which may be involved multiple times if the connection

     * to ZooKeeper closes during this operation

     *

     * @return the result of the operation or null

     * @throws KeeperException

     * @throws InterruptedException

     */

    public boolean execute() throws KeeperException, InterruptedException;

}

因为Zookeeper的操作会失败,这个类封装了多次尝试:

/**

 *

 * Licensed to the Apache Software Foundation (ASF) under one or more

 * contributor license agreements.  See the NOTICE file distributed with

 * this work for additional information regarding copyright ownership.

 * The ASF licenses this file to You under the Apache License, Version 2.0

 * (the "License"); you may not use this file except in compliance with

 * the License.  You may obtain a copy of the License at

 *

 * http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package org.apache.zookeeper.recipes.lock;

 

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.apache.zookeeper.CreateMode;

import org.apache.zookeeper.KeeperException;

import org.apache.zookeeper.ZooDefs;

import org.apache.zookeeper.ZooKeeper;

import org.apache.zookeeper.data.ACL;

import org.apache.zookeeper.data.Stat;

import org.apache.zookeeper.recipes.lock.ZooKeeperOperation;

 

import java.util.List;

import java.util.concurrent.atomic.AtomicBoolean;

 

/**

 * A base class for protocol implementations which provides a number of higher 

 * level helper methods for working with ZooKeeper along with retrying synchronous

 *  operations if the connection to ZooKeeper closes such as 

 *  {@link #retryOperation(ZooKeeperOperation)}

 *

 */

class ProtocolSupport {

    private static final Logger LOG = LoggerFactory.getLogger(ProtocolSupport.class);

 

    protected final ZooKeeper zookeeper;

    private AtomicBoolean closed = new AtomicBoolean(false);

    private long retryDelay = 500L;

    private int retryCount = 10;

    private List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;

 

    public ProtocolSupport(ZooKeeper zookeeper) {

        this.zookeeper = zookeeper;

    }

 

    /**

     * Closes this strategy and releases any ZooKeeper resources; but keeps the

     *  ZooKeeper instance open

     */

    public void close() {

        if (closed.compareAndSet(false, true)) {

            doClose();

        }

    }

     

    /**

     * return zookeeper client instance

     * @return zookeeper client instance

     */

    public ZooKeeper getZookeeper() {

        return zookeeper;

    }

 

    /**

     * return the acl its using

     * @return the acl.

     */

    public List<ACL> getAcl() {

        return acl;

    }

 

    /**

     * set the acl 

     * @param acl the acl to set to

     */

    public void setAcl(List<ACL> acl) {

        this.acl = acl;

    }

 

    /**

     * get the retry delay in milliseconds

     * @return the retry delay

     */

    public long getRetryDelay() {

        return retryDelay;

    }

 

    /**

     * Sets the time waited between retry delays

     * @param retryDelay the retry delay

     */

    public void setRetryDelay(long retryDelay) {

        this.retryDelay = retryDelay;

    }

 

    /**

     * Allow derived classes to perform 

     * some custom closing operations to release resources

     */

    protected void doClose() {

    }

 

 

    /**

     * Perform the given operation, retrying if the connection fails

     * @return object. it needs to be cast to the callee"s expected 

     * return type.

     */

    protected Object retryOperation(ZooKeeperOperation operation) 

        throws KeeperException, InterruptedException {

        KeeperException exception = null;

        for (int i = 0; i < retryCount; i++) {

            try {

                return operation.execute();

            } catch (KeeperException.SessionExpiredException e) {

                LOG.warn("Session expired for: " + zookeeper + " so reconnecting due to: " + e, e);

                throw e;

            } catch (KeeperException.ConnectionLossException e) {

                if (exception == null) {

                    exception = e;

                }

                LOG.debug("Attempt " + i + " failed with connection loss so " +

                        "attempting to reconnect: " + e, e);

                retryDelay(i);

            }

        }

        throw exception;

    }

 

    /**

     * Ensures that the given path exists with no data, the current

     * ACL and no flags

     * @param path

     */

    protected void ensurePathExists(String path) {

        ensureExists(path, null, acl, CreateMode.PERSISTENT);

    }

 

    /**

     * Ensures that the given path exists with the given data, ACL and flags

     * @param path

     * @param acl

     * @param flags

     */

    protected void ensureExists(final String path, final byte[] data,

            final List<ACL> acl, final CreateMode flags) {

        try {

            retryOperation(new ZooKeeperOperation() {

                public boolean execute() throws KeeperException, InterruptedException {

                    Stat stat = zookeeper.exists(path, false);

                    if (stat != null) {

                        return true;

                    }

                    zookeeper.create(path, data, acl, flags);

                    return true;

                }

            });

        } catch (KeeperException e) {

            LOG.warn("Caught: " + e, e);

        } catch (InterruptedException e) {

            LOG.warn("Caught: " + e, e);

        }

    }

 

    /**

     * Returns true if this protocol has been closed

     * @return true if this protocol is closed

     */

    protected boolean isClosed() {

        return closed.get();

    }

 

    /**

     * Performs a retry delay if this is not the first attempt

     * @param attemptCount the number of the attempts performed so far

     */

    protected void retryDelay(int attemptCount) {

        if (attemptCount > 0) {

            try {

                Thread.sleep(attemptCount * retryDelay);

            } catch (InterruptedException e) {

                LOG.debug("Failed to sleep: " + e, e);

            }

        }

    }

}

这个类是本客户端获取到lock和释放lock的时候触发操作的接口:

public interface LockListener {

    /**

     * call back called when the lock 

     * is acquired

     */

    public void lockAcquired();

     

    /**

     * call back called when the lock is 

     * released.

     */

    public void lockReleased();

}

队列(Queue)

分布式队列是通用的数据结构,为了在 Zookeeper 中实现分布式队列,首先需要指定一个 Znode 节点作为队列节点(queue node), 各个分布式客户端通过调用 create() 函数向队列中放入数据,调用create()时节点路径名带”qn-”结尾,并设置顺序(sequence)节点标志。 由于设置了节点的顺序标志,新的路径名具有以下字符串模式:”_path-to-queue-node_/qn-X”,X 是唯一自增号。需要从队列中获取数据/移除数据的客户端首先调用 getChildren() 函数,有数据则获取(获取数据后可以删除也可以不删),没有则在队列节点(queue node)上将 watch 设置为 true,等待触发并处理最小序号的节点(即从序号最小的节点中取数据)。

实现步骤基本如下:

前提:需要一个队列root节点dir

入队:使用create()创建节点,将共享数据data放在该节点上,节点类型为PERSISTENT_SEQUENTIAL,永久顺序性的(也可以设置为临时的,看需求)。

出队:因为队列可能为空,2种方式处理:一种如果为空则wait等待,一种返回异常。

等待方式:这里使用了CountDownLatch的等待和Watcher的通知机制,使用了TreeMap的排序获取节点顺序最小的数据(FIFO)。

抛出异常:getChildren()获取队列数据时,如果size==0则抛出异常。

一个分布式Queue的实现,详细代码:

package org.apache.zookeeper.recipes.queue;

 

import java.util.List;

import java.util.NoSuchElementException;

import java.util.TreeMap;

import java.util.concurrent.CountDownLatch;

 

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.apache.zookeeper.CreateMode;

import org.apache.zookeeper.KeeperException;

import org.apache.zookeeper.WatchedEvent;

import org.apache.zookeeper.Watcher;

import org.apache.zookeeper.ZooDefs;

import org.apache.zookeeper.ZooKeeper;

import org.apache.zookeeper.data.ACL;

import org.apache.zookeeper.data.Stat;

 

/**

 * 

 * A <a href="http://www.gunmi.cn/v/package.html">protocol to implement a distributed queue</a>.

 * 

 */

public class DistributedQueue {

    private static final Logger LOG = LoggerFactory.getLogger(DistributedQueue.class);

 

    private final String dir;

 

    private ZooKeeper zookeeper;

    private List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;

 

    private final String prefix = "qn-";

 

    public DistributedQueue(ZooKeeper zookeeper, String dir, List<ACL> acl){

        this.dir = dir;

 

        if(acl != null){

            this.acl = acl;

        }

        this.zookeeper = zookeeper;

         

        //Add root dir first if not exists

        if (zookeeper != null) {

            try {

                Stat s = zookeeper.exists(dir, false);

                if (s == null) {

                    zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT);

                }

            } catch (KeeperException e) {

                LOG.error(e.toString());

            } catch (InterruptedException e) {

                LOG.error(e.toString());

            }

        }

    }

 

    /**

     * Returns a Map of the children, ordered by id.

     * @param watcher optional watcher on getChildren() operation.

     * @return map from id to child name for all children

     */

    private TreeMap<Long,String> orderedChildren(Watcher watcher) throws KeeperException, InterruptedException {

        TreeMap<Long,String> orderedChildren = new TreeMap<Long,String>();

 

        List<String> childNames = null;

        try{

            childNames = zookeeper.getChildren(dir, watcher);

        }catch (KeeperException.NoNodeException e){

            throw e;

        }

 

        for(String childName : childNames){

            try{

                //Check format

                if(!childName.regionMatches(0, prefix, 0, prefix.length())){

                    LOG.warn("Found child node with improper name: " + childName);

                    continue;

                }

                String suffix = childName.substring(prefix.length());

                Long childId = new Long(suffix);

                orderedChildren.put(childId,childName);

            }catch(NumberFormatException e){

                LOG.warn("Found child node with improper format : " + childName + " " + e,e);

            }

        }

 

        return orderedChildren;

    }

 

    /**

     * Find the smallest child node.

     * @return The name of the smallest child node.

     */

    private String smallestChildName() throws KeeperException, InterruptedException {

        long minId = Long.MAX_VALUE;

        String minName = "";

 

        List<String> childNames = null;

 

        try{

            childNames = zookeeper.getChildren(dir, false);

        }catch(KeeperException.NoNodeException e){

            LOG.warn("Caught: " +e,e);

            return null;

        }

 

        for(String childName : childNames){

            try{

                //Check format

                if(!childName.regionMatches(0, prefix, 0, prefix.length())){

                    LOG.warn("Found child node with improper name: " + childName);

                    continue;

                }

                String suffix = childName.substring(prefix.length());

                long childId = Long.parseLong(suffix);

                if(childId < minId){

                    minId = childId;

                    minName = childName;

                }

            }catch(NumberFormatException e){

                LOG.warn("Found child node with improper format : " + childName + " " + e,e);

            }

        }

 

        if(minId < Long.MAX_VALUE){

            return minName;

        }else{

            return null;

        }

    }

 

    /**

     * Return the head of the queue without modifying the queue.

     * @return the data at the head of the queue.

     * @throws NoSuchElementException

     * @throws KeeperException

     * @throws InterruptedException

     */

    public byte[] element() throws NoSuchElementException, KeeperException, InterruptedException {

        TreeMap<Long,String> orderedChildren;

 

        // element, take, and remove follow the same pattern.

        // We want to return the child node with the smallest sequence number.

        // Since other clients are remove()ing and take()ing nodes concurrently, 

        // the child with the smallest sequence number in orderedChildren might be gone by the time we check.

        // We don"t call getChildren again until we have tried the rest of the nodes in sequence order.

        while(true){

            try{

                orderedChildren = orderedChildren(null);

            }catch(KeeperException.NoNodeException e){

                throw new NoSuchElementException();

            }

            if(orderedChildren.size() == 0 ) throw new NoSuchElementException();

 

            for(String headNode : orderedChildren.values()){

                if(headNode != null){

                    try{

                        return zookeeper.getData(dir+"/"+headNode, false, null);

                    }catch(KeeperException.NoNodeException e){

                        //Another client removed the node first, try next

                    }

                }

            }

 

        }

    }

 

 

    /**

     * Attempts to remove the head of the queue and return it.

     * @return The former head of the queue

     * @throws NoSuchElementException

     * @throws KeeperException

     * @throws InterruptedException

     */

    public byte[] remove() throws NoSuchElementException, KeeperException, InterruptedException {

        TreeMap<Long,String> orderedChildren;

        // Same as for element.  Should refactor this.

        while(true){

            try{

                orderedChildren = orderedChildren(null);

            }catch(KeeperException.NoNodeException e){

                throw new NoSuchElementException();

            }

            if(orderedChildren.size() == 0) throw new NoSuchElementException();

 

            for(String headNode : orderedChildren.values()){

                String path = dir +"/"+headNode;

                try{

                    byte[] data = http://www.gunmi.cn/v/zookeeper.getData(path, false, null);

                    zookeeper.delete(path, -1);

                    return data;

                }catch(KeeperException.NoNodeException e){

                    // Another client deleted the node first.

                }

            }

        }

    }

 

    private class LatchChildWatcher implements Watcher {

 

        CountDownLatch latch;

 

        public LatchChildWatcher(){

            latch = new CountDownLatch(1);

        }

 

        public void process(WatchedEvent event){

            LOG.debug("Watcher fired on path: " + event.getPath() + " state: " + 

                    event.getState() + " type " + event.getType());

            latch.countDown();

        }

        public void await() throws InterruptedException {

            latch.await();

        }

    }

 

    /**

     * Removes the head of the queue and returns it, blocks until it succeeds.

     * @return The former head of the queue

     * @throws NoSuchElementException

     * @throws KeeperException

     * @throws InterruptedException

     */

    public byte[] take() throws KeeperException, InterruptedException {

        TreeMap<Long,String> orderedChildren;

        // Same as for element.  Should refactor this.

        while(true){

            LatchChildWatcher childWatcher = new LatchChildWatcher();

            try{

                orderedChildren = orderedChildren(childWatcher);

            }catch(KeeperException.NoNodeException e){

                zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT);

                continue;

            }

            if(orderedChildren.size() == 0){

                childWatcher.await();

                continue;

            }

 

            for(String headNode : orderedChildren.values()){

                String path = dir +"/"+headNode;

                try{

                    byte[] data = http://www.gunmi.cn/v/zookeeper.getData(path, false, null);

                    zookeeper.delete(path, -1);

                    return data;

                }catch(KeeperException.NoNodeException e){

                    // Another client deleted the node first.

                }

            }

        }

    }

 

    /**

     * Inserts data into queue.

     * @param data

     * @return true if data was successfully added

     */

    public boolean offer(byte[] data) throws KeeperException, InterruptedException{

        for(;;){

            try{

                zookeeper.create(dir+"/"+prefix, data, acl, CreateMode.PERSISTENT_SEQUENTIAL);

                return true;

            }catch(KeeperException.NoNodeException e){

                zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT);

            }

        }

    }

 

    /**

     * Returns the data at the first element of the queue, or null if the queue is empty.

     * @return data at the first element of the queue, or null.

     * @throws KeeperException

     * @throws InterruptedException

     */

    public byte[] peek() throws KeeperException, InterruptedException{

        try{

            return element();

        }catch(NoSuchElementException e){

            return null;

        }

    }

 

    /**

     * Attempts to remove the head of the queue and return it. Returns null if the queue is empty.

     * @return Head of the queue or null.

     * @throws KeeperException

     * @throws InterruptedException

     */

    public byte[] poll() throws KeeperException, InterruptedException {

        try{

            return remove();

        }catch(NoSuchElementException e){

            return null;

        }

    }

}

Apache Curator

Curator是一个封装Zookeeper操作的库,使用这个库的好处是Curator帮你管理和Zookeeper的连接,当连接有问题时会自动重试(retry)。

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3)

CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);

client.start();

Curator已经封装了一些常用的Recipes

Distributed Lock

InterProcessMutex lock = new InterProcessMutex(client, lockPath);

if ( lock.acquire(maxWait, waitUnit) ) 

{

    try

    {

        // do some work inside of the critical section here

    }

    finally

    {

        lock.release();

    }

}

Leader Election

LeaderSelectorListener listener = new LeaderSelectorListenerAdapter()

{

    public void takeLeadership(CuratorFramework client) throws Exception

    {

        // this callback will get called when you are the leader

        // do whatever leader work you need to and only exit

        // this method when you want to relinquish leadership

    }

}

 

LeaderSelector selector = new LeaderSelector(client, path, listener);

selector.autoRequeue();  // not required, but this is behavior that you will probably expect

selector.start();

参考:

  • http://zookeeper.apache.org/doc/trunk/recipes.html

  • http://curator.apache.org/curator-recipes/index.html

  • 看完本文有收获?请转发分享给更多人

    关注「ImportNew」,看技术干货

    基于 ZooKeeper 的分布式锁和队列( 下 )