基于 ZooKeeper 的分布式锁和队列( 下 )
(点击上方公众号,可快速关注)
来源:阿凡卢,
www.cnblogs.com/luxiaoxun/p/4889764.html
如有好文章投稿,请点击 → 这里了解详情
Zookeeper统一操作ZooKeeperOperation接口:
public interface ZooKeeperOperation {
/**
* Performs the operation - which may be involved multiple times if the connection
* to ZooKeeper closes during this operation
*
* @return the result of the operation or null
* @throws KeeperException
* @throws InterruptedException
*/
public boolean execute() throws KeeperException, InterruptedException;
}
因为Zookeeper的操作会失败,这个类封装了多次尝试:
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper.recipes.lock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.recipes.lock.ZooKeeperOperation;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* A base class for protocol implementations which provides a number of higher
* level helper methods for working with ZooKeeper along with retrying synchronous
* operations if the connection to ZooKeeper closes such as
* {@link #retryOperation(ZooKeeperOperation)}
*
*/
class ProtocolSupport {
private static final Logger LOG = LoggerFactory.getLogger(ProtocolSupport.class);
protected final ZooKeeper zookeeper;
private AtomicBoolean closed = new AtomicBoolean(false);
private long retryDelay = 500L;
private int retryCount = 10;
private List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
public ProtocolSupport(ZooKeeper zookeeper) {
this.zookeeper = zookeeper;
}
/**
* Closes this strategy and releases any ZooKeeper resources; but keeps the
* ZooKeeper instance open
*/
public void close() {
if (closed.compareAndSet(false, true)) {
doClose();
}
}
/**
* return zookeeper client instance
* @return zookeeper client instance
*/
public ZooKeeper getZookeeper() {
return zookeeper;
}
/**
* return the acl its using
* @return the acl.
*/
public List<ACL> getAcl() {
return acl;
}
/**
* set the acl
* @param acl the acl to set to
*/
public void setAcl(List<ACL> acl) {
this.acl = acl;
}
/**
* get the retry delay in milliseconds
* @return the retry delay
*/
public long getRetryDelay() {
return retryDelay;
}
/**
* Sets the time waited between retry delays
* @param retryDelay the retry delay
*/
public void setRetryDelay(long retryDelay) {
this.retryDelay = retryDelay;
}
/**
* Allow derived classes to perform
* some custom closing operations to release resources
*/
protected void doClose() {
}
/**
* Perform the given operation, retrying if the connection fails
* @return object. it needs to be cast to the callee"s expected
* return type.
*/
protected Object retryOperation(ZooKeeperOperation operation)
throws KeeperException, InterruptedException {
KeeperException exception = null;
for (int i = 0; i < retryCount; i++) {
try {
return operation.execute();
} catch (KeeperException.SessionExpiredException e) {
LOG.warn("Session expired for: " + zookeeper + " so reconnecting due to: " + e, e);
throw e;
} catch (KeeperException.ConnectionLossException e) {
if (exception == null) {
exception = e;
}
LOG.debug("Attempt " + i + " failed with connection loss so " +
"attempting to reconnect: " + e, e);
retryDelay(i);
}
}
throw exception;
}
/**
* Ensures that the given path exists with no data, the current
* ACL and no flags
* @param path
*/
protected void ensurePathExists(String path) {
ensureExists(path, null, acl, CreateMode.PERSISTENT);
}
/**
* Ensures that the given path exists with the given data, ACL and flags
* @param path
* @param acl
* @param flags
*/
protected void ensureExists(final String path, final byte[] data,
final List<ACL> acl, final CreateMode flags) {
try {
retryOperation(new ZooKeeperOperation() {
public boolean execute() throws KeeperException, InterruptedException {
Stat stat = zookeeper.exists(path, false);
if (stat != null) {
return true;
}
zookeeper.create(path, data, acl, flags);
return true;
}
});
} catch (KeeperException e) {
LOG.warn("Caught: " + e, e);
} catch (InterruptedException e) {
LOG.warn("Caught: " + e, e);
}
}
/**
* Returns true if this protocol has been closed
* @return true if this protocol is closed
*/
protected boolean isClosed() {
return closed.get();
}
/**
* Performs a retry delay if this is not the first attempt
* @param attemptCount the number of the attempts performed so far
*/
protected void retryDelay(int attemptCount) {
if (attemptCount > 0) {
try {
Thread.sleep(attemptCount * retryDelay);
} catch (InterruptedException e) {
LOG.debug("Failed to sleep: " + e, e);
}
}
}
}
这个类是本客户端获取到lock和释放lock的时候触发操作的接口:
public interface LockListener {
/**
* call back called when the lock
* is acquired
*/
public void lockAcquired();
/**
* call back called when the lock is
* released.
*/
public void lockReleased();
}
队列(Queue)
分布式队列是通用的数据结构,为了在 Zookeeper 中实现分布式队列,首先需要指定一个 Znode 节点作为队列节点(queue node), 各个分布式客户端通过调用 create() 函数向队列中放入数据,调用create()时节点路径名带”qn-”结尾,并设置顺序(sequence)节点标志。 由于设置了节点的顺序标志,新的路径名具有以下字符串模式:”_path-to-queue-node_/qn-X”,X 是唯一自增号。需要从队列中获取数据/移除数据的客户端首先调用 getChildren() 函数,有数据则获取(获取数据后可以删除也可以不删),没有则在队列节点(queue node)上将 watch 设置为 true,等待触发并处理最小序号的节点(即从序号最小的节点中取数据)。
实现步骤基本如下:
前提:需要一个队列root节点dir
入队:使用create()创建节点,将共享数据data放在该节点上,节点类型为PERSISTENT_SEQUENTIAL,永久顺序性的(也可以设置为临时的,看需求)。
出队:因为队列可能为空,2种方式处理:一种如果为空则wait等待,一种返回异常。
等待方式:这里使用了CountDownLatch的等待和Watcher的通知机制,使用了TreeMap的排序获取节点顺序最小的数据(FIFO)。
抛出异常:getChildren()获取队列数据时,如果size==0则抛出异常。
一个分布式Queue的实现,详细代码:
package org.apache.zookeeper.recipes.queue;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
/**
*
* A <a href="http://www.gunmi.cn/v/package.html">protocol to implement a distributed queue</a>.
*
*/
public class DistributedQueue {
private static final Logger LOG = LoggerFactory.getLogger(DistributedQueue.class);
private final String dir;
private ZooKeeper zookeeper;
private List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
private final String prefix = "qn-";
public DistributedQueue(ZooKeeper zookeeper, String dir, List<ACL> acl){
this.dir = dir;
if(acl != null){
this.acl = acl;
}
this.zookeeper = zookeeper;
//Add root dir first if not exists
if (zookeeper != null) {
try {
Stat s = zookeeper.exists(dir, false);
if (s == null) {
zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT);
}
} catch (KeeperException e) {
LOG.error(e.toString());
} catch (InterruptedException e) {
LOG.error(e.toString());
}
}
}
/**
* Returns a Map of the children, ordered by id.
* @param watcher optional watcher on getChildren() operation.
* @return map from id to child name for all children
*/
private TreeMap<Long,String> orderedChildren(Watcher watcher) throws KeeperException, InterruptedException {
TreeMap<Long,String> orderedChildren = new TreeMap<Long,String>();
List<String> childNames = null;
try{
childNames = zookeeper.getChildren(dir, watcher);
}catch (KeeperException.NoNodeException e){
throw e;
}
for(String childName : childNames){
try{
//Check format
if(!childName.regionMatches(0, prefix, 0, prefix.length())){
LOG.warn("Found child node with improper name: " + childName);
continue;
}
String suffix = childName.substring(prefix.length());
Long childId = new Long(suffix);
orderedChildren.put(childId,childName);
}catch(NumberFormatException e){
LOG.warn("Found child node with improper format : " + childName + " " + e,e);
}
}
return orderedChildren;
}
/**
* Find the smallest child node.
* @return The name of the smallest child node.
*/
private String smallestChildName() throws KeeperException, InterruptedException {
long minId = Long.MAX_VALUE;
String minName = "";
List<String> childNames = null;
try{
childNames = zookeeper.getChildren(dir, false);
}catch(KeeperException.NoNodeException e){
LOG.warn("Caught: " +e,e);
return null;
}
for(String childName : childNames){
try{
//Check format
if(!childName.regionMatches(0, prefix, 0, prefix.length())){
LOG.warn("Found child node with improper name: " + childName);
continue;
}
String suffix = childName.substring(prefix.length());
long childId = Long.parseLong(suffix);
if(childId < minId){
minId = childId;
minName = childName;
}
}catch(NumberFormatException e){
LOG.warn("Found child node with improper format : " + childName + " " + e,e);
}
}
if(minId < Long.MAX_VALUE){
return minName;
}else{
return null;
}
}
/**
* Return the head of the queue without modifying the queue.
* @return the data at the head of the queue.
* @throws NoSuchElementException
* @throws KeeperException
* @throws InterruptedException
*/
public byte[] element() throws NoSuchElementException, KeeperException, InterruptedException {
TreeMap<Long,String> orderedChildren;
// element, take, and remove follow the same pattern.
// We want to return the child node with the smallest sequence number.
// Since other clients are remove()ing and take()ing nodes concurrently,
// the child with the smallest sequence number in orderedChildren might be gone by the time we check.
// We don"t call getChildren again until we have tried the rest of the nodes in sequence order.
while(true){
try{
orderedChildren = orderedChildren(null);
}catch(KeeperException.NoNodeException e){
throw new NoSuchElementException();
}
if(orderedChildren.size() == 0 ) throw new NoSuchElementException();
for(String headNode : orderedChildren.values()){
if(headNode != null){
try{
return zookeeper.getData(dir+"/"+headNode, false, null);
}catch(KeeperException.NoNodeException e){
//Another client removed the node first, try next
}
}
}
}
}
/**
* Attempts to remove the head of the queue and return it.
* @return The former head of the queue
* @throws NoSuchElementException
* @throws KeeperException
* @throws InterruptedException
*/
public byte[] remove() throws NoSuchElementException, KeeperException, InterruptedException {
TreeMap<Long,String> orderedChildren;
// Same as for element. Should refactor this.
while(true){
try{
orderedChildren = orderedChildren(null);
}catch(KeeperException.NoNodeException e){
throw new NoSuchElementException();
}
if(orderedChildren.size() == 0) throw new NoSuchElementException();
for(String headNode : orderedChildren.values()){
String path = dir +"/"+headNode;
try{
byte[] data = http://www.gunmi.cn/v/zookeeper.getData(path, false, null);
zookeeper.delete(path, -1);
return data;
}catch(KeeperException.NoNodeException e){
// Another client deleted the node first.
}
}
}
}
private class LatchChildWatcher implements Watcher {
CountDownLatch latch;
public LatchChildWatcher(){
latch = new CountDownLatch(1);
}
public void process(WatchedEvent event){
LOG.debug("Watcher fired on path: " + event.getPath() + " state: " +
event.getState() + " type " + event.getType());
latch.countDown();
}
public void await() throws InterruptedException {
latch.await();
}
}
/**
* Removes the head of the queue and returns it, blocks until it succeeds.
* @return The former head of the queue
* @throws NoSuchElementException
* @throws KeeperException
* @throws InterruptedException
*/
public byte[] take() throws KeeperException, InterruptedException {
TreeMap<Long,String> orderedChildren;
// Same as for element. Should refactor this.
while(true){
LatchChildWatcher childWatcher = new LatchChildWatcher();
try{
orderedChildren = orderedChildren(childWatcher);
}catch(KeeperException.NoNodeException e){
zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT);
continue;
}
if(orderedChildren.size() == 0){
childWatcher.await();
continue;
}
for(String headNode : orderedChildren.values()){
String path = dir +"/"+headNode;
try{
byte[] data = http://www.gunmi.cn/v/zookeeper.getData(path, false, null);
zookeeper.delete(path, -1);
return data;
}catch(KeeperException.NoNodeException e){
// Another client deleted the node first.
}
}
}
}
/**
* Inserts data into queue.
* @param data
* @return true if data was successfully added
*/
public boolean offer(byte[] data) throws KeeperException, InterruptedException{
for(;;){
try{
zookeeper.create(dir+"/"+prefix, data, acl, CreateMode.PERSISTENT_SEQUENTIAL);
return true;
}catch(KeeperException.NoNodeException e){
zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT);
}
}
}
/**
* Returns the data at the first element of the queue, or null if the queue is empty.
* @return data at the first element of the queue, or null.
* @throws KeeperException
* @throws InterruptedException
*/
public byte[] peek() throws KeeperException, InterruptedException{
try{
return element();
}catch(NoSuchElementException e){
return null;
}
}
/**
* Attempts to remove the head of the queue and return it. Returns null if the queue is empty.
* @return Head of the queue or null.
* @throws KeeperException
* @throws InterruptedException
*/
public byte[] poll() throws KeeperException, InterruptedException {
try{
return remove();
}catch(NoSuchElementException e){
return null;
}
}
}
Apache Curator
Curator是一个封装Zookeeper操作的库,使用这个库的好处是Curator帮你管理和Zookeeper的连接,当连接有问题时会自动重试(retry)。
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3)
CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
client.start();
Curator已经封装了一些常用的Recipes
Distributed Lock
InterProcessMutex lock = new InterProcessMutex(client, lockPath);
if ( lock.acquire(maxWait, waitUnit) )
{
try
{
// do some work inside of the critical section here
}
finally
{
lock.release();
}
}
Leader Election
LeaderSelectorListener listener = new LeaderSelectorListenerAdapter()
{
public void takeLeadership(CuratorFramework client) throws Exception
{
// this callback will get called when you are the leader
// do whatever leader work you need to and only exit
// this method when you want to relinquish leadership
}
}
LeaderSelector selector = new LeaderSelector(client, path, listener);
selector.autoRequeue(); // not required, but this is behavior that you will probably expect
selector.start();
参考:
http://zookeeper.apache.org/doc/trunk/recipes.html
http://curator.apache.org/curator-recipes/index.html
看完本文有收获?请转发分享给更多人
关注「ImportNew」,看技术干货
- 阿里自研分布式强一致关系型数据库——X-DB
- 〖技术众包〗寻高通660 全屏手机量产方案、基于Nrf52840的智能手
- 学界 | 阿里NIPS 2017 Workshop论文:基于TensorFlow的深度模型
- 〖技术众包〗基于MT7688核心板外围电路的原理图设计、无屏行车记
- 议程来了,上海天然气分布式能源考察
- 基于无套利原则的银行间国债市场异常价格研究
- 肇庆市分布式光伏综合示范项目首期工程成功并网发电
- 现代教学丨基于标准 培育素养 ——“基于课程标准教学的区域性转
- Redis 分布式锁的正确实现方式( Java 版 )
- 2017 BDTC大会,巨杉出品金融级分布式架构专场,邀您参加!