RabbitMQ 最常用的三大模式实例解析
这篇文章主要介绍了RabbitMQ最常用的三大模式实例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
Direct模式
- 所有发送到DirectExchange的消息被转发到RouteKey中指定的Queue。
- Direct模式可以使用RabbitMQ自带的Exchange:defaultExchange,所以不需要将Exchange进行任何绑定(binding)操作。
- 消息传递时,RouteKey必须完全匹配才会被队列接收,否则该消息会被抛弃,
importcom.rabbitmq.client.Channel;
importcom.rabbitmq.client.Connection;
importcom.rabbitmq.client.ConnectionFactory;
publicclassDirectProducer{
publicstaticvoidmain(String[]args)throwsException{
//1.创建一个ConnectionFactory并进行设置
ConnectionFactoryfactory=newConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
//2.通过连接工厂来创建连接
Connectionconnection=factory.newConnection();
//3.通过Connection来创建Channel
Channelchannel=connection.createChannel();
//4.声明
StringexchangeName="test_direct_exchange";
StringroutingKey="item.direct";
//5.发送
Stringmsg="thisisdirectmsg";
channel.basicPublish(exchangeName,routingKey,null,msg.getBytes());
System.out.println("Sendmessage:"+msg);
//6.关闭连接
channel.close();
connection.close();
}
}
importcom.rabbitmq.client.*;
importjava.io.IOException;
publicclassDirectConsumer{
publicstaticvoidmain(String[]args)throwsException{
//1.创建一个ConnectionFactory并进行设置
ConnectionFactoryfactory=newConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(3000);
//2.通过连接工厂来创建连接
Connectionconnection=factory.newConnection();
//3.通过Connection来创建Channel
Channelchannel=connection.createChannel();
//4.声明
StringexchangeName="test_direct_exchange";
StringqueueName="test_direct_queue";
StringroutingKey="item.direct";
channel.exchangeDeclare(exchangeName,"direct",true,false,null);
channel.queueDeclare(queueName,false,false,false,null);
//一般不用代码绑定,在管理界面手动绑定
channel.queueBind(queueName,exchangeName,routingKey);
//5.创建消费者并接收消息
Consumerconsumer=newDefaultConsumer(channel){
@Override
publicvoidhandleDelivery(StringconsumerTag,Envelopeenvelope,
AMQP.BasicPropertiesproperties,byte[]body)
throwsIOException{
Stringmessage=newString(body,"UTF-8");
System.out.println("[x]Received'"+message+"'");
}
};
//6.设置Channel消费者绑定队列
channel.basicConsume(queueName,true,consumer);
}
}
Sendmessage:thisisdirectmsg [x]Received'thisisdirectmsg'
Topic模式
可以使用通配符进行模糊匹配
- 符号'#"匹配一个或多个词
- 符号"*”匹配不多不少一个词
例如
- 'log.#"能够匹配到'log.info.oa"
- "log.*"只会匹配到"log.erro“
importcom.rabbitmq.client.Channel;
importcom.rabbitmq.client.Connection;
importcom.rabbitmq.client.ConnectionFactory;
publicclassTopicProducer{
publicstaticvoidmain(String[]args)throwsException{
//1.创建一个ConnectionFactory并进行设置
ConnectionFactoryfactory=newConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
//2.通过连接工厂来创建连接
Connectionconnection=factory.newConnection();
//3.通过Connection来创建Channel
Channelchannel=connection.createChannel();
//4.声明
StringexchangeName="test_topic_exchange";
StringroutingKey1="item.update";
StringroutingKey2="item.delete";
StringroutingKey3="user.add";
//5.发送
Stringmsg="thisistopicmsg";
channel.basicPublish(exchangeName,routingKey1,null,msg.getBytes());
channel.basicPublish(exchangeName,routingKey2,null,msg.getBytes());
channel.basicPublish(exchangeName,routingKey3,null,msg.getBytes());
System.out.println("Sendmessage:"+msg);
//6.关闭连接
channel.close();
connection.close();
}
}
importcom.rabbitmq.client.*;
importjava.io.IOException;
publicclassTopicConsumer{
publicstaticvoidmain(String[]args)throwsException{
//1.创建一个ConnectionFactory并进行设置
ConnectionFactoryfactory=newConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(3000);
//2.通过连接工厂来创建连接
Connectionconnection=factory.newConnection();
//3.通过Connection来创建Channel
Channelchannel=connection.createChannel();
//4.声明
StringexchangeName="test_topic_exchange";
StringqueueName="test_topic_queue";
StringroutingKey="item.#";
channel.exchangeDeclare(exchangeName,"topic",true,false,null);
channel.queueDeclare(queueName,false,false,false,null);
//一般不用代码绑定,在管理界面手动绑定
channel.queueBind(queueName,exchangeName,routingKey);
//5.创建消费者并接收消息
Consumerconsumer=newDefaultConsumer(channel){
@Override
publicvoidhandleDelivery(StringconsumerTag,Envelopeenvelope,
AMQP.BasicPropertiesproperties,byte[]body)
throwsIOException{
Stringmessage=newString(body,"UTF-8");
System.out.println("[x]Received'"+message+"'");
}
};
//6.设置Channel消费者绑定队列
channel.basicConsume(queueName,true,consumer);
}
}
Sendmessage:thisistopcmsg [x]Received'thisistopcmsg' [x]Received'thisistopcmsg'
Fanout模式
不处理路由键,只需要简单的将队列绑定到交换机上发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。
Fanout交换机转发消息是最快的。
importcom.rabbitmq.client.*;
importjava.io.IOException;
publicclassFanoutConsumer{
publicstaticvoidmain(String[]args)throwsException{
//1.创建一个ConnectionFactory并进行设置
ConnectionFactoryfactory=newConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(3000);
//2.通过连接工厂来创建连接
Connectionconnection=factory.newConnection();
//3.通过Connection来创建Channel
Channelchannel=connection.createChannel();
//4.声明
StringexchangeName="test_fanout_exchange";
StringqueueName="test_fanout_queue";
StringroutingKey="item.#";
channel.exchangeDeclare(exchangeName,"fanout",true,false,null);
channel.queueDeclare(queueName,false,false,false,null);
//一般不用代码绑定,在管理界面手动绑定
channel.queueBind(queueName,exchangeName,routingKey);
//5.创建消费者并接收消息
Consumerconsumer=newDefaultConsumer(channel){
@Override
publicvoidhandleDelivery(StringconsumerTag,Envelopeenvelope,
AMQP.BasicPropertiesproperties,byte[]body)
throwsIOException{
Stringmessage=newString(body,"UTF-8");
System.out.println("[x]Received'"+message+"'");
}
};
//6.设置Channel消费者绑定队列
channel.basicConsume(queueName,true,consumer);
}
}
importcom.rabbitmq.client.Channel;
importcom.rabbitmq.client.Connection;
importcom.rabbitmq.client.ConnectionFactory;
publicclassFanoutProducer{
publicstaticvoidmain(String[]args)throwsException{
//1.创建一个ConnectionFactory并进行设置
ConnectionFactoryfactory=newConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
//2.通过连接工厂来创建连接
Connectionconnection=factory.newConnection();
//3.通过Connection来创建Channel
Channelchannel=connection.createChannel();
//4.声明
StringexchangeName="test_fanout_exchange";
StringroutingKey1="item.update";
StringroutingKey2="";
StringroutingKey3="ookjkjjkhjhk";//任意routingkey
//5.发送
Stringmsg="thisisfanoutmsg";
channel.basicPublish(exchangeName,routingKey1,null,msg.getBytes());
channel.basicPublish(exchangeName,routingKey2,null,msg.getBytes());
channel.basicPublish(exchangeName,routingKey3,null,msg.getBytes());
System.out.println("Sendmessage:"+msg);
//6.关闭连接
channel.close();
connection.close();
}
}
Sendmessage:thisisfanoutmsg [x]Received'thisisfanoutmsg' [x]Received'thisisfanoutmsg' [x]Received'thisisfanoutmsg'
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。