RabbitMQ消息中间件示例详解
前言
RabbitMQ是使用Erlang语言开发的消息中间件,其遵循了高级消息队列协议(AdvancedMessageQueuingProtocol,AMQP)。
与Kafka等消息队列相比,RabbitMQ最大的优势在于其较高的可靠性:
- 提供确认(ACK)和重传机制保证消息完成消费,消费者异常不会导致消息丢失
- 提供消息持久化机制,broker崩溃不会导致消息丢失
- 集群模式下工作,保证高可用
因为具有较高可靠性和一致性,RabbitMQ可以胜任订单处理、秒杀等一致性要求较高的业务场景。
RabbitMQ概念与机制
RabbitMQ中的概念模型:
- Broker:消息中间件实例,可能是单个节点也可能是运行在多节点集群上的逻辑实体
- 消息(Message):消息由消息头和消息体两部分组成。消息头中包括routing-key、priority等标准消息头以及其它自定义消息头,用于定义RabbitMQ对消息行为。消息体是字节流,包含消息内容。
- 连接(Connection):客户端与Broker之间的TCP连接
- 信道(Channel):Channel是建立在TCP连接上的逻辑(虚拟)连接。多个Channel复用同一个TCP连接,以避免建立TCP连接的巨大开销。RabbitMQ官方要求每个线程使用独立的Channel,禁止多个线程共用Channel。
- 生产者(Publisher):发送消息的客户端线程
- 消费者(Consumer):处理消息的客户端线程
- 交换机(Exchange):交换机负责将消息投递到相应的队列
- 队列(Queue):接收并保存交换机投递的消息,直至被消费者成功消费。逻辑结构遵循先进先出FIFO。
- 绑定(Binding):将队列(Queue)注册到交换机(Exchange)的路由表
- 虚拟主机(Vhost):每个Broker下可建立多个vhost,每个vhost可建立独立的Exchange、Queue、绑定及权限系统。同一个Broker下的vhost共享Connection、Channel和用户系统,就是说可以使用同一个用户身份使用同一个Channel访问不同vhost。
交换机(Exchange)
生产者发送的消息会首先送到交换机(Exchange),交换机根据自身类型和消息的routing-key等信息将消息投递到绑定的消息队列中。
RabbitMQ中的四种标准交换机:
direct:如果消息的routing-key与队列的binding-key完全相同,direct类型的交换机则会将消息投递到该队列中。
- 多个队列可以使用相同的binding-key绑定到同一个direct交换机,direct交换机会把消息投递到所有binding-key与消息routing-key相同的队列
topic:允许队列的binding-key中包含通配符*和#,topic交换机会将消息投递到binding-key与routing-key匹配的队列中。
- 通配符按照关键字进行匹配,如news.cn.a中的关键字是news、cn和a,即关键字按照.分割
- #通配符匹配0个或多个关键字,news.#.a可以匹配news.a,news.cn.a和news.asia.cn.a等
- *通配符匹配一个关键字,news.*.a匹配news.cn.a不匹配news.a、news.asia.cn.a
fanout:fanout交换机不进行任何匹配,将消息投递到所有绑定的队列
header:header交换机根据消息头进行投递,现在已较少使用
我们可以使用RabbitMQ的插件机制使用第三方交换机或自行开发交换机。如实现延时投递的delayed-message-exchange。
消息头中的delivery-mode可以设置为persistent(持久化)或者transient(易失)。Exchange和Queue在处理持久化的消息时都会先将消息写入磁盘中再进行下一步处理,即使RabbitMQ崩溃也不会丢失。
消费者客户端通常使用的channel.basicConsume使用推(push)模式投递消息,即当有新消息时Broker通过channel主动向客户端发送消息。客户端也可以使用channel.basicGet从Broker拉取消息。
ACK机制
RabbitMQ提供了确认送达(acknowledge)机制保证消息被正确处理不会丢失。
确认送达的回执有三种:
- ACK:消息已被成功处理
- NACK:消息处理异常,需要重新投递
- REJECT:消息非法,丢弃消息
RabbitMQ的Queue可以设置no_ack=true,则消息被投递后即删除不等待回执。
channel.basicConsume可以指定auto_ack模式,若auto_ack=true当客户端收到完整消息后即会自动发出ACK回执,否则必须显式的发出回执。
Java代码示例
首先安装并启动RabbitMQ实例,Mac用户可以使用Homebrew进行安装:
brewinstallrabbitmq
启动服务:
brewservicesstartrabbitmq
或者使用官方docker镜像:
dockerrun-d--hostnamemy-rabbit--namesome-rabbitrabbitmq:3-management
RabbitMQ官网提供了Ubuntu、RPM以及Windows等多种平台安装方式。
RabbitMQ默认TCP端口为5672,Web控制台默认端口15672。
在Maven中添加依赖:
com.rabbitmq amqp-client 5.5.1
编写生产者:
packagerabbit; importjava.io.IOException; importjava.util.concurrent.TimeoutException; importcom.rabbitmq.client.AMQP; importcom.rabbitmq.client.Channel; importcom.rabbitmq.client.Connection; importcom.rabbitmq.client.ConnectionFactory; /** *@authorfinley */ publicclassRabbitProducer{ publicstaticvoidmain(String[]args)throwsIOException,TimeoutException{ ConnectionFactoryfactory=newConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("localhost"); try(Connectionconn=factory.newConnection(); Channelchannel=conn.createChannel()){ StringexchangeName="test-exchange"; channel.exchangeDeclare(exchangeName,"direct",true); StringroutingKey="hello"; byte[]msg="helloworld".getBytes(); AMQP.BasicProperties.BuilderpropsBuilder=newAMQP.BasicProperties.Builder(); propsBuilder.deliveryMode(2);//persistent propsBuilder.priority(0);//normal propsBuilder.contentType("text/plain"); channel.basicPublish(exchangeName,routingKey,propsBuilder.build(),msg); } } }
编写消费者:
packagerabbit; importjava.io.IOException; importjava.util.concurrent.TimeoutException; importcom.rabbitmq.client.*; /** *@authorfinley */ publicclassRabbitConsumer{ publicstaticvoidmain(String[]args)throwsIOException,TimeoutException{ ConnectionFactoryfactory=newConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("localhost"); try(Connectionconn=factory.newConnection(); Channelchannel=conn.createChannel()){ StringexchangeName="test-exchange"; channel.exchangeDeclare(exchangeName,"direct",true); StringqueueName=channel.queueDeclare().getQueue(); StringbindingKey="hello"; channel.queueBind(queueName,exchangeName,bindingKey); while(true){ channel.basicConsume(queueName,false,"",newDefaultConsumer(channel){ @Override publicvoidhandleDelivery(StringconsumerTag,Envelopeenvelope,AMQP.BasicPropertiesproperties,byte[]body)throwsIOException{ StringroutingKey=envelope.getRoutingKey(); StringcontentType=properties.getContentType(); StringbodyStr=newString(body,"UTF-8"); System.out.println("routingKey:"+routingKey+",contentType:"+contentType+",body:"+bodyStr); longdeliveryTag=envelope.getDeliveryTag(); channel.basicAck(deliveryTag,false); } }); } } } }
RabbitMQ的消息为字节,可以将Java对象序列化后作为消息体发送。
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对毛票票的支持。