基于ZooKeeper实现队列源码
实现原理
先进先出队列是最常用的队列,使用Zookeeper实现先进先出队列就是在特定的目录下创建PERSISTENT_EQUENTIAL节点,创建成功时Watcher通知等待的队列,队列删除序列号最小的节点用以消费。此场景下Zookeeper的znode用于消息存储,znode存储的数据就是消息队列中的消息内容,SEQUENTIAL序列号就是消息的编号,按序取出即可。由于创建的节点是持久化的,所以不必担心队列消息的丢失问题。
队列(Queue)
分布式队列是通用的数据结构,为了在Zookeeper中实现分布式队列,首先需要指定一个Znode节点作为队列节点(queuenode),各个分布式客户端通过调用create()函数向队列中放入数据,调用create()时节点路径名带"qn-"结尾,并设置顺序(sequence)节点标志。由于设置了节点的顺序标志,新的路径名具有以下字符串模式:"_path-to-queue-node_/qn-X",X是唯一自增号。需要从队列中获取数据/移除数据的客户端首先调用getChildren()函数,有数据则获取(获取数据后可以删除也可以不删),没有则在队列节点(queuenode)上将watch设置为true,等待触发并处理最小序号的节点(即从序号最小的节点中取数据)。
应用场景
Zookeeper队列不太适合要求高性能的场合,但可以在数据量不大的情况下考虑使用。比如已在项目中使用Zookeeper又需要小规模的队列应用,这时可以使用Zookeeper实现的队列;毕竟引进一个消息中间件会增加系统的复杂性和运维的压力。
详细代码
ZookeeperClient工具类
packageorg.massive.common; importorg.apache.zookeeper.WatchedEvent; importorg.apache.zookeeper.Watcher; importorg.apache.zookeeper.ZooKeeper; importjava.io.IOException; importjava.util.concurrent.CountDownLatch; importjava.util.concurrent.TimeUnit; /** *CreatedbyMassiveon2016/12/18. */ publicclassZooKeeperClient{ privatestaticStringconnectionString="localhost:2181"; privatestaticintsessionTimeout=10000; publicstaticZooKeepergetInstance()throwsIOException,InterruptedException{ //-------------------------------------------------------------- //为避免连接还未完成就执行zookeeper的get/create/exists操作引起的(KeeperErrorCode=ConnectionLoss) //这里等Zookeeper的连接完成才返回实例 //-------------------------------------------------------------- finalCountDownLatchconnectedSignal=newCountDownLatch(1); ZooKeeperzk=newZooKeeper(connectionString,sessionTimeout,newWatcher(){ @Override publicvoidprocess(WatchedEventevent){ if(event.getState()==Event.KeeperState.SyncConnected){ connectedSignal.countDown(); }elseif(event.getState()==Event.KeeperState.Expired){ } } }); connectedSignal.await(sessionTimeout,TimeUnit.MILLISECONDS); returnzk; } publicstaticintgetSessionTimeout(){ returnsessionTimeout; } publicstaticvoidsetSessionTimeout(intsessionTimeout){ ZooKeeperClient.sessionTimeout=sessionTimeout; } }
ZooKeeperQueue
packageorg.massive.queue; importorg.apache.commons.lang3.RandomUtils; importorg.apache.zookeeper.*; importorg.apache.zookeeper.data.Stat; importorg.massive.common.ZooKeeperClient; importjava.io.IOException; importjava.io.UnsupportedEncodingException; importjava.util.List; importjava.util.SortedSet; importjava.util.TreeSet; /** *CreatedbyAllenon2016/12/22. */ publicclassZooKeeperQueue{ privateZooKeeperzk; privateintsessionTimeout; privatestaticbyte[]ROOT_QUEUE_DATA={0x12,0x34}; privatestaticStringQUEUE_ROOT="/QUEUE"; privateStringqueueName; privateStringqueuePath; privateObjectmutex=newObject(); publicZooKeeperQueue(StringqueueName)throwsIOException,KeeperException,InterruptedException{ this.queueName=queueName; this.queuePath=QUEUE_ROOT+"/"+queueName; this.zk=ZooKeeperClient.getInstance(); this.sessionTimeout=zk.getSessionTimeout(); //---------------------------------------------------- //确保队列根目录/QUEUE和当前队列的目录的存在 //---------------------------------------------------- ensureExists(QUEUE_ROOT); ensureExists(queuePath); } publicbyte[]consume()throwsInterruptedException,KeeperException,UnsupportedEncodingException{ Listnodes=null; byte[]returnVal=null; Statstat=null; do{ synchronized(mutex){ nodes=zk.getChildren(queuePath,newProduceWatcher()); //---------------------------------------------------- //如果没有消息节点,等待生产者的通知 //---------------------------------------------------- if(nodes==null||nodes.size()==0){ mutex.wait(); }else{ SortedSet sortedNode=newTreeSet (); for(Stringnode:nodes){ sortedNode.add(queuePath+"/"+node); } //---------------------------------------------------- //消费队列里序列号最小的消息 //---------------------------------------------------- Stringfirst=sortedNode.first(); returnVal=zk.getData(first,false,stat); zk.delete(first,-1); System.out.print(Thread.currentThread().getName()+""); System.out.print("consumeamessagefromqueue:"+first); System.out.println(",messagedatais:"+newString(returnVal,"UTF-8")); returnreturnVal; } } }while(true); } classProduceWatcherimplementsWatcher{ @Override publicvoidprocess(WatchedEventevent){ //---------------------------------------------------- //生产一条消息成功后通知一个等待线程 //---------------------------------------------------- synchronized(mutex){ mutex.notify(); } } } publicvoidproduce(byte[]data)throwsKeeperException,InterruptedException,UnsupportedEncodingException{ //---------------------------------------------------- //确保当前队列目录存在 //example:/QUEUE/queueName //---------------------------------------------------- ensureExists(queuePath); Stringnode=zk.create(queuePath+"/",data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); System.out.print(Thread.currentThread().getName()+""); System.out.print("produceamessagetoqueue:"+node); System.out.println(",messagedatais:"+newString(data,"UTF-8")); } publicvoidensureExists(Stringpath){ try{ Statstat=zk.exists(path,false); if(stat==null){ zk.create(path,ROOT_QUEUE_DATA,ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); } }catch(KeeperExceptione){ e.printStackTrace(); }catch(InterruptedExceptione){ e.printStackTrace(); } } publicstaticvoidmain(String[]args)throwsIOException,InterruptedException,KeeperException{ StringqueueName="test"; finalZooKeeperQueuequeue=newZooKeeperQueue(queueName); for(inti=0;i<10;i++){ newThread(newRunnable(){ @Override publicvoidrun(){ try{ queue.consume(); System.out.println("--------------------------------------------------------"); System.out.println(); }catch(InterruptedExceptione){ e.printStackTrace(); }catch(KeeperExceptione){ e.printStackTrace(); }catch(UnsupportedEncodingExceptione){ e.printStackTrace(); } } }).start(); } newThread(newRunnable(){ @Override publicvoidrun(){ for(inti=0;i<10;i++){ try{ Thread.sleep(RandomUtils.nextInt(100*i,200*i)); queue.produce(("massive"+i).getBytes()); }catch(InterruptedExceptione){ e.printStackTrace(); }catch(KeeperExceptione){ e.printStackTrace(); }catch(UnsupportedEncodingExceptione){ e.printStackTrace(); } } } },"Produce-thread").start(); } }
测试
运行main方法,本机器的某次输出结果
Produce-threadproduceamessagetoqueue:/QUEUE/test/0000000000,messagedatais:massive0 Thread-8consumeamessagefromqueue:/QUEUE/test/0000000000,messagedatais:massive0 -------------------------------------------------------- Produce-threadproduceamessagetoqueue:/QUEUE/test/0000000001,messagedatais:massive1 Thread-6consumeamessagefromqueue:/QUEUE/test/0000000001,messagedatais:massive1 -------------------------------------------------------- Produce-threadproduceamessagetoqueue:/QUEUE/test/0000000002,messagedatais:massive2 Thread-3consumeamessagefromqueue:/QUEUE/test/0000000002,messagedatais:massive2 -------------------------------------------------------- Produce-threadproduceamessagetoqueue:/QUEUE/test/0000000003,messagedatais:massive3 Thread-0consumeamessagefromqueue:/QUEUE/test/0000000003,messagedatais:massive3 -------------------------------------------------------- Produce-threadproduceamessagetoqueue:/QUEUE/test/0000000004,messagedatais:massive4 Thread-5consumeamessagefromqueue:/QUEUE/test/0000000004,messagedatais:massive4 -------------------------------------------------------- Produce-threadproduceamessagetoqueue:/QUEUE/test/0000000005,messagedatais:massive5 Thread-2consumeamessagefromqueue:/QUEUE/test/0000000005,messagedatais:massive5 -------------------------------------------------------- Produce-threadproduceamessagetoqueue:/QUEUE/test/0000000006,messagedatais:massive6 Thread-4consumeamessagefromqueue:/QUEUE/test/0000000006,messagedatais:massive6 -------------------------------------------------------- Produce-threadproduceamessagetoqueue:/QUEUE/test/0000000007,messagedatais:massive7 Thread-9consumeamessagefromqueue:/QUEUE/test/0000000007,messagedatais:massive7 -------------------------------------------------------- Produce-threadproduceamessagetoqueue:/QUEUE/test/0000000008,messagedatais:massive8 Thread-7consumeamessagefromqueue:/QUEUE/test/0000000008,messagedatais:massive8 -------------------------------------------------------- Produce-threadproduceamessagetoqueue:/QUEUE/test/0000000009,messagedatais:massive9 Thread-1consumeamessagefromqueue:/QUEUE/test/0000000009,messagedatais:massive9
总结
以上就是本文有关于队列和基于ZooKeeper实现队列源码介绍的全部内容,希望对大家有所帮助。
感谢朋友们对本站的支持!