基于ZooKeeper实现队列源码
实现原理
先进先出队列是最常用的队列,使用Zookeeper实现先进先出队列就是在特定的目录下创建PERSISTENT_EQUENTIAL节点,创建成功时Watcher通知等待的队列,队列删除序列号最小的节点用以消费。此场景下Zookeeper的znode用于消息存储,znode存储的数据就是消息队列中的消息内容,SEQUENTIAL序列号就是消息的编号,按序取出即可。由于创建的节点是持久化的,所以不必担心队列消息的丢失问题。
队列(Queue)
分布式队列是通用的数据结构,为了在Zookeeper中实现分布式队列,首先需要指定一个Znode节点作为队列节点(queuenode),各个分布式客户端通过调用create()函数向队列中放入数据,调用create()时节点路径名带"qn-"结尾,并设置顺序(sequence)节点标志。由于设置了节点的顺序标志,新的路径名具有以下字符串模式:"_path-to-queue-node_/qn-X",X是唯一自增号。需要从队列中获取数据/移除数据的客户端首先调用getChildren()函数,有数据则获取(获取数据后可以删除也可以不删),没有则在队列节点(queuenode)上将watch设置为true,等待触发并处理最小序号的节点(即从序号最小的节点中取数据)。
应用场景
Zookeeper队列不太适合要求高性能的场合,但可以在数据量不大的情况下考虑使用。比如已在项目中使用Zookeeper又需要小规模的队列应用,这时可以使用Zookeeper实现的队列;毕竟引进一个消息中间件会增加系统的复杂性和运维的压力。
详细代码
ZookeeperClient工具类
packageorg.massive.common;
importorg.apache.zookeeper.WatchedEvent;
importorg.apache.zookeeper.Watcher;
importorg.apache.zookeeper.ZooKeeper;
importjava.io.IOException;
importjava.util.concurrent.CountDownLatch;
importjava.util.concurrent.TimeUnit;
/**
*CreatedbyMassiveon2016/12/18.
*/
publicclassZooKeeperClient{
privatestaticStringconnectionString="localhost:2181";
privatestaticintsessionTimeout=10000;
publicstaticZooKeepergetInstance()throwsIOException,InterruptedException{
//--------------------------------------------------------------
//为避免连接还未完成就执行zookeeper的get/create/exists操作引起的(KeeperErrorCode=ConnectionLoss)
//这里等Zookeeper的连接完成才返回实例
//--------------------------------------------------------------
finalCountDownLatchconnectedSignal=newCountDownLatch(1);
ZooKeeperzk=newZooKeeper(connectionString,sessionTimeout,newWatcher(){
@Override
publicvoidprocess(WatchedEventevent){
if(event.getState()==Event.KeeperState.SyncConnected){
connectedSignal.countDown();
}elseif(event.getState()==Event.KeeperState.Expired){
}
}
});
connectedSignal.await(sessionTimeout,TimeUnit.MILLISECONDS);
returnzk;
}
publicstaticintgetSessionTimeout(){
returnsessionTimeout;
}
publicstaticvoidsetSessionTimeout(intsessionTimeout){
ZooKeeperClient.sessionTimeout=sessionTimeout;
}
}
ZooKeeperQueue
packageorg.massive.queue;
importorg.apache.commons.lang3.RandomUtils;
importorg.apache.zookeeper.*;
importorg.apache.zookeeper.data.Stat;
importorg.massive.common.ZooKeeperClient;
importjava.io.IOException;
importjava.io.UnsupportedEncodingException;
importjava.util.List;
importjava.util.SortedSet;
importjava.util.TreeSet;
/**
*CreatedbyAllenon2016/12/22.
*/
publicclassZooKeeperQueue{
privateZooKeeperzk;
privateintsessionTimeout;
privatestaticbyte[]ROOT_QUEUE_DATA={0x12,0x34};
privatestaticStringQUEUE_ROOT="/QUEUE";
privateStringqueueName;
privateStringqueuePath;
privateObjectmutex=newObject();
publicZooKeeperQueue(StringqueueName)throwsIOException,KeeperException,InterruptedException{
this.queueName=queueName;
this.queuePath=QUEUE_ROOT+"/"+queueName;
this.zk=ZooKeeperClient.getInstance();
this.sessionTimeout=zk.getSessionTimeout();
//----------------------------------------------------
//确保队列根目录/QUEUE和当前队列的目录的存在
//----------------------------------------------------
ensureExists(QUEUE_ROOT);
ensureExists(queuePath);
}
publicbyte[]consume()throwsInterruptedException,KeeperException,UnsupportedEncodingException{
Listnodes=null;
byte[]returnVal=null;
Statstat=null;
do{
synchronized(mutex){
nodes=zk.getChildren(queuePath,newProduceWatcher());
//----------------------------------------------------
//如果没有消息节点,等待生产者的通知
//----------------------------------------------------
if(nodes==null||nodes.size()==0){
mutex.wait();
}else{
SortedSetsortedNode=newTreeSet();
for(Stringnode:nodes){
sortedNode.add(queuePath+"/"+node);
}
//----------------------------------------------------
//消费队列里序列号最小的消息
//----------------------------------------------------
Stringfirst=sortedNode.first();
returnVal=zk.getData(first,false,stat);
zk.delete(first,-1);
System.out.print(Thread.currentThread().getName()+"");
System.out.print("consumeamessagefromqueue:"+first);
System.out.println(",messagedatais:"+newString(returnVal,"UTF-8"));
returnreturnVal;
}
}
}while(true);
}
classProduceWatcherimplementsWatcher{
@Override
publicvoidprocess(WatchedEventevent){
//----------------------------------------------------
//生产一条消息成功后通知一个等待线程
//----------------------------------------------------
synchronized(mutex){
mutex.notify();
}
}
}
publicvoidproduce(byte[]data)throwsKeeperException,InterruptedException,UnsupportedEncodingException{
//----------------------------------------------------
//确保当前队列目录存在
//example:/QUEUE/queueName
//----------------------------------------------------
ensureExists(queuePath);
Stringnode=zk.create(queuePath+"/",data,
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL);
System.out.print(Thread.currentThread().getName()+"");
System.out.print("produceamessagetoqueue:"+node);
System.out.println(",messagedatais:"+newString(data,"UTF-8"));
}
publicvoidensureExists(Stringpath){
try{
Statstat=zk.exists(path,false);
if(stat==null){
zk.create(path,ROOT_QUEUE_DATA,ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
}
}catch(KeeperExceptione){
e.printStackTrace();
}catch(InterruptedExceptione){
e.printStackTrace();
}
}
publicstaticvoidmain(String[]args)throwsIOException,InterruptedException,KeeperException{
StringqueueName="test";
finalZooKeeperQueuequeue=newZooKeeperQueue(queueName);
for(inti=0;i<10;i++){
newThread(newRunnable(){
@Override
publicvoidrun(){
try{
queue.consume();
System.out.println("--------------------------------------------------------");
System.out.println();
}catch(InterruptedExceptione){
e.printStackTrace();
}catch(KeeperExceptione){
e.printStackTrace();
}catch(UnsupportedEncodingExceptione){
e.printStackTrace();
}
}
}).start();
}
newThread(newRunnable(){
@Override
publicvoidrun(){
for(inti=0;i<10;i++){
try{
Thread.sleep(RandomUtils.nextInt(100*i,200*i));
queue.produce(("massive"+i).getBytes());
}catch(InterruptedExceptione){
e.printStackTrace();
}catch(KeeperExceptione){
e.printStackTrace();
}catch(UnsupportedEncodingExceptione){
e.printStackTrace();
}
}
}
},"Produce-thread").start();
}
}
测试
运行main方法,本机器的某次输出结果
Produce-threadproduceamessagetoqueue:/QUEUE/test/0000000000,messagedatais:massive0 Thread-8consumeamessagefromqueue:/QUEUE/test/0000000000,messagedatais:massive0 -------------------------------------------------------- Produce-threadproduceamessagetoqueue:/QUEUE/test/0000000001,messagedatais:massive1 Thread-6consumeamessagefromqueue:/QUEUE/test/0000000001,messagedatais:massive1 -------------------------------------------------------- Produce-threadproduceamessagetoqueue:/QUEUE/test/0000000002,messagedatais:massive2 Thread-3consumeamessagefromqueue:/QUEUE/test/0000000002,messagedatais:massive2 -------------------------------------------------------- Produce-threadproduceamessagetoqueue:/QUEUE/test/0000000003,messagedatais:massive3 Thread-0consumeamessagefromqueue:/QUEUE/test/0000000003,messagedatais:massive3 -------------------------------------------------------- Produce-threadproduceamessagetoqueue:/QUEUE/test/0000000004,messagedatais:massive4 Thread-5consumeamessagefromqueue:/QUEUE/test/0000000004,messagedatais:massive4 -------------------------------------------------------- Produce-threadproduceamessagetoqueue:/QUEUE/test/0000000005,messagedatais:massive5 Thread-2consumeamessagefromqueue:/QUEUE/test/0000000005,messagedatais:massive5 -------------------------------------------------------- Produce-threadproduceamessagetoqueue:/QUEUE/test/0000000006,messagedatais:massive6 Thread-4consumeamessagefromqueue:/QUEUE/test/0000000006,messagedatais:massive6 -------------------------------------------------------- Produce-threadproduceamessagetoqueue:/QUEUE/test/0000000007,messagedatais:massive7 Thread-9consumeamessagefromqueue:/QUEUE/test/0000000007,messagedatais:massive7 -------------------------------------------------------- Produce-threadproduceamessagetoqueue:/QUEUE/test/0000000008,messagedatais:massive8 Thread-7consumeamessagefromqueue:/QUEUE/test/0000000008,messagedatais:massive8 -------------------------------------------------------- Produce-threadproduceamessagetoqueue:/QUEUE/test/0000000009,messagedatais:massive9 Thread-1consumeamessagefromqueue:/QUEUE/test/0000000009,messagedatais:massive9
总结
以上就是本文有关于队列和基于ZooKeeper实现队列源码介绍的全部内容,希望对大家有所帮助。
感谢朋友们对本站的支持!