Spring Cloud Stream异常处理过程解析
应用处理
当消费者在处理接收到的消息时,有可能会由于某些原因而抛出异常。若希望对抛出来的异常进行处理的话,就需要采取一些异常处理手段,异常处理的方式可分为三种:应用层面的处理、系统层面的处理以及通过RetryTemplate进行处理。
本小节先来介绍较为常用的应用层面的异常处理方式,该方式又细分为局部处理和全局处理。
局部处理
Stream相关的配置内容如下:
spring: cloud: stream: rocketmq: binder: name-server:192.168.190.129:9876 bindings: input: destination:stream-test-topic group:binder-group
所谓局部处理就是针对指定的channel进行处理,需要定义一个处理异常的方法,并在该方法上添加@ServiceActivator注解,该注解有一个inputChannel属性,用于指定对哪个channel进行处理,格式为{destination}.{group}.errors。具体代码如下:
packagecom.zj.node.usercenter.rocketmq; importlombok.extern.slf4j.Slf4j; importorg.springframework.cloud.stream.annotation.StreamListener; importorg.springframework.cloud.stream.messaging.Sink; importorg.springframework.integration.annotation.ServiceActivator; importorg.springframework.messaging.Message; importorg.springframework.messaging.support.ErrorMessage; importorg.springframework.stereotype.Service; /** *消费者 * *@author01 *@date2019-08-10 **/ @Slf4j @Service publicclassTestStreamConsumer{ @StreamListener(Sink.INPUT) publicvoidreceive1(StringmessageBody){ log.info("消费消息,messageBody={}",messageBody); thrownewIllegalArgumentException("参数错误"); } /** *处理局部异常的方法 * *@paramerrorMessage异常消息对象 */ @ServiceActivator( //通过特定的格式指定处理哪个channel的异常 inputChannel="stream-test-topic.binder-group.errors" ) publicvoidhandleError(ErrorMessageerrorMessage){ //获取异常对象 ThrowableerrorMessagePayload=errorMessage.getPayload(); log.error("发生异常",errorMessagePayload); //获取消息体 Message>originalMessage=errorMessage.getOriginalMessage(); if(originalMessage!=null){ log.error("消息体:{}",originalMessage.getPayload()); }else{ log.error("消息体为空"); } } }
全局处理
全局处理则是可以处理所有channel抛出来的异常,所有的channel抛出异常后会生成一个ErrorMessage对象,即错误消息。错误消息会被放到一个专门的channel里,这个channel就是errorChannel。所以通过监听errorChannel就可以实现全局异常的处理。具体代码如下:
@StreamListener(Sink.INPUT) publicvoidreceive1(StringmessageBody){ log.info("消费消息,messageBody={}",messageBody); thrownewIllegalArgumentException("参数错误"); } /** *处理全局异常的方法 * *@paramerrorMessage异常消息对象 */ @StreamListener("errorChannel") publicvoidhandleError(ErrorMessageerrorMessage){ log.error("发生异常.errorMessage={}",errorMessage); }
系统处理
系统处理方式,因消息中间件的不同而异。如果应用层面没有配置错误处理,那么error将会被传播给binder,而binder则会将error回传给消息中间件。消息中间件可以选择:
- 丢弃消息:错误消息将被丢弃。虽然在某些情况下可以接受,但这种方式一般不适用于生产
- requeue(重新排队,从而重新处理)
- 将失败的消息发送给DLQ(死信队列)
DLQ
目前RabbitMQ对DLQ的支持比较好,这里以RabbitMQ为例,只需要添加DLQ相关的配置:
spring: cloud: stream: bindings: input: destination:stream-test-topic group:binder-group rabbit: bindings: input: consumer: #自动将失败的消息发送给DLQ auto-bind-dlq:true
消息消费失败后,就会放入死信队列。在控制台操作一下,即可将死信放回消息队列,这样,客户端就可以重新处理。
如果想获取原始错误的异常堆栈,可添加如下配置:
spring: cloud: stream: rabbit: bindings: input: consumer: republish-to-dlq:true
requeue
Rabbit及Kafka的binder依赖RetryTemplate实现消息重试,从而提升消息处理的成功率。然而,如果设置了spring.cloud.stream.bindings.input.consumer.max-attempts=1,那么RetryTemplate则不会再重试。此时可以通过requeue方式来处理异常。
需要添加如下配置:
#默认是3,设为1则禁用重试 spring.cloud.stream.bindings..consumer.max-attempts=1 #表示是否要requeue被拒绝的消息(即:requeue处理失败的消息) spring.cloud.stream.rabbit.bindings.input.consumer.requeue-rejected=true
这样,失败的消息将会被重新提交到同一个handler进行处理,直到handler抛出AmqpRejectAndDontRequeueException异常为止。
RetryTemplate
RetryTemplate主要用于实现消息重试,也是错误处理的一种手段。有两种配置方式,一种是通过配置文件进行配置,如下示例:
spring: cloud: stream: bindings:: consumer: #最多尝试处理几次,默认3 maxAttempts:3 #重试时初始避退间隔,单位毫秒,默认1000 backOffInitialInterval:1000 #重试时最大避退间隔,单位毫秒,默认10000 backOffMaxInterval:10000 #避退乘数,默认2.0 backOffMultiplier:2.0 #当listen抛出retryableExceptions未列出的异常时,是否要重试 defaultRetryable:true #异常是否允许重试的map映射 retryableExceptions: java.lang.RuntimeException:true java.lang.IllegalStateException:false
另一种则是通过代码配置,在多数场景下,使用配置文件定制重试行为都是可以满足需求的,但配置文件里支持的配置项可能无法满足一些复杂需求。此时可使用代码方式配置RetryTemplate,如下示例:
@Configuration classRetryConfiguration{ @StreamRetryTemplate publicRetryTemplatesinkConsumerRetryTemplate(){ RetryTemplateretryTemplate=newRetryTemplate(); retryTemplate.setRetryPolicy(retryPolicy()); retryTemplate.setBackOffPolicy(backOffPolicy()); returnretryTemplate; } privateExceptionClassifierRetryPolicyretryPolicy(){ BinaryExceptionClassifierkeepRetryingClassifier=newBinaryExceptionClassifier( Collections.singletonList(IllegalAccessException.class )); keepRetryingClassifier.setTraverseCauses(true); SimpleRetryPolicysimpleRetryPolicy=newSimpleRetryPolicy(3); AlwaysRetryPolicyalwaysRetryPolicy=newAlwaysRetryPolicy(); ExceptionClassifierRetryPolicyretryPolicy=newExceptionClassifierRetryPolicy(); retryPolicy.setExceptionClassifier( classifiable->keepRetryingClassifier.classify(classifiable)? alwaysRetryPolicy:simpleRetryPolicy); returnretryPolicy; } privateFixedBackOffPolicybackOffPolicy(){ finalFixedBackOffPolicybackOffPolicy=newFixedBackOffPolicy(); backOffPolicy.setBackOffPeriod(2); returnbackOffPolicy; } }
最后还需要添加一段配置:
spring.cloud.stream.bindings..consumer.retry-template-name=myRetryTemplate
注:SpringCloudStream2.2才支持设置retry-template-name
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。