点击上方"IT牧场",选择"设为星标"技术干货每日送达!
TIPS 本文基于Spring Cloud Greenwich SR1,理论支持Finchley及更高版本。
本节详细探讨Spring Cloud Stream的错误处理。
应用处理
局部处理(通用)
配置:
代码语言:javascript复制spring:
cloud:
stream:
bindings:
input:
destination: my-destination
group: my-group
output:
destination: my-destination
代码:
代码语言:javascript复制@Slf4j
@SpringBootApplication
@EnableBinding({Processor.class})
@EnableScheduling
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
@StreamListener(value = Processor.INPUT)
public void handle(String body) {
throw new RuntimeException("x");
}
@ServiceActivator(inputChannel = "my-destination.my-group.errors")
public void handleError(ErrorMessage message) {
Throwable throwable = message.getPayload();
log.error("截获异常", throwable);
Message<?> originalMessage = message.getOriginalMessage();
assert originalMessage != null;
log.info("原始消息体 = {}", new String((byte[]) originalMessage.getPayload()));
}
@Bean
@InboundChannelAdapter(value = Processor.OUTPUT,
poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))
public MessageSource<String> test() {
return () -> new GenericMessage<>("adfdfdsafdsfa");
}
}
全局处理(通用)
代码语言:javascript复制@StreamListener(value = Processor.INPUT)
public void handle(String body) {
throw new RuntimeException("x");
}
@StreamListener("errorChannel")
public void error(Message<?> message) {
ErrorMessage errorMessage = (ErrorMessage) message;
System.out.println("Handling ERROR: " errorMessage);
}
系统处理
系统处理方式,因消息中间件不同而异。如果应用没有配置错误处理,那么error将会被传播给binder,binder将error回传给消息中间件。消息中间件可以丢弃消息、requeue(重新排队,从而重新处理)或将失败的消息发送给DLQ(死信队列)。
丢弃
默认情况下,错误消息将被丢弃。虽然在某些情况下可以接受,但这种方式一般不适用于生产。
DLQ(RabbitMQ)
TIPS •虽然RocketMQ也支持DLQ,但目前RocketMQ控制台并不支持在界面上操作,将死信放回消息队列,让客户端重新处理。所以使用很不方便,而且用法也和本节有一些差异。•如使用RocketMQ,建议参考上面应用处理一节的用法,也可额外订阅这个Topic
%DLQ% consumerGroup
•个人给RocketMQ控制台提的Issue:https://github.com/apache/rocketmq/issues/1334
配置:
代码语言:javascript复制spring:
cloud:
stream:
bindings:
input:
destination: my-destination
group: my-group
output:
destination: my-destination
rabbit:
bindings:
input:
consumer:
auto-bind-dlq: true
代码:
代码语言:javascript复制@StreamListener(value = Processor.INPUT)
public void handle(String body) {
throw new RuntimeException("x");
}
@Bean
@InboundChannelAdapter(value = Processor.OUTPUT,
poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))
public MessageSource<String> test() {
return () -> new GenericMessage<>("adfdfdsafdsfa");
}
这样,消息消费失败后,就会放入死信队列。在控制台操作一下,即可将死信放回消息队列,这样,客户端就可以重新处理。
如果想获取原始错误的异常堆栈,可添加如下配置:
代码语言:javascript复制spring:
cloud:
stream:
rabbit:
bindings:
input:
consumer:
republish-to-dlq: true
requeue(RabbitMQ)
Rabbit/Kafka的binder依赖RetryTemplate实现重试,从而提升消息处理的成功率。然而,如果设置了spring.cloud.stream.bindings.input.consumer.max-attempts=1
,那么RetryTemplate则不再重试。此时可通过requeue方式处理异常。
添加如下配置:
代码语言:javascript复制# 默认是3,设为1则禁用重试
spring.cloud.stream.bindings.<input channel名称>.consumer.max-attempts=1
# 表示是否要requeue被拒绝的消息(即:requeue处理失败的消息)
spring.cloud.stream.rabbit.bindings.input.consumer.requeue-rejected=true
这样,失败的消息将会被重新提交到同一个handler进行处理,直到handler抛出 AmqpRejectAndDontRequeueException
异常为止。
RetryTemplate(通用)
配置方式
RetryTemplate重试也是错误处理的一种手段。
代码语言:javascript复制spring:
cloud:
stream:
bindings:
<input channel名称>:
consumer:
# 最多尝试处理几次,默认3
maxAttempts: 3
# 重试时初始避退间隔,单位毫秒,默认1000
backOffInitialInterval: 1000
# 重试时最大避退间隔,单位毫秒,默认10000
backOffMaxInterval: 10000
# 避退乘数,默认2.0
backOffMultiplier: 2.0
# 当listen抛出retryableExceptions未列出的异常时,是否要重试
defaultRetryable: true
# 异常是否允许重试的map映射
retryableExceptions:
java.lang.RuntimeException: true
java.lang.IllegalStateException: false
测试代码:
代码语言:javascript复制@StreamListener(value = Processor.INPUT)
public void handle(String body) {
throw new RuntimeException(body);
}
private AtomicInteger count = new AtomicInteger(0);
@Bean
@InboundChannelAdapter(value = Processor.OUTPUT,
poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))
public MessageSource<String> test() {
return () -> new GenericMessage<>(count.getAndAdd(1) "");
}
编码方式
多数场景下,使用配置方式定制重试行为都是可以满足需求的,但配置方式可能无法满足一些复杂需求。此时可使用编码方式配置RetryTemplate:
代码语言:javascript复制@Configuration
class RetryConfiguration {
@StreamRetryTemplate
public RetryTemplate sinkConsumerRetryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(retryPolicy());
retryTemplate.setBackOffPolicy(backOffPolicy());
return retryTemplate;
}
private ExceptionClassifierRetryPolicy retryPolicy() {
BinaryExceptionClassifier keepRetryingClassifier = new BinaryExceptionClassifier(
Collections.singletonList(IllegalAccessException.class
));
keepRetryingClassifier.setTraverseCauses(true);
SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(3);
AlwaysRetryPolicy alwaysRetryPolicy = new AlwaysRetryPolicy();
ExceptionClassifierRetryPolicy retryPolicy = new ExceptionClassifierRetryPolicy();
retryPolicy.setExceptionClassifier(
classifiable -> keepRetryingClassifier.classify(classifiable) ?
alwaysRetryPolicy : simpleRetryPolicy);
return retryPolicy;
}
private FixedBackOffPolicy backOffPolicy() {
final FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(2);
return backOffPolicy;
}
}
然后添加配置:
代码语言:javascript复制spring.cloud.stream.bindings.<input channel名称>.consumer.retry-template-name=myRetryTemplate
注意: Spring Cloud Stream 2.2才支持设置retry-template-name
干货分享
最近将个人学习笔记整理成册,使用PDF分享。关注我,回复如下代码,即可获得百度盘地址,无套路领取! •001:《Java并发与高并发解决方案》学习笔记;•002:《深入JVM内核——原理、诊断与优化》学习笔记;•003:《Java面试宝典》•004:《Docker开源书》•005:《Kubernetes开源书》•006:《DDD速成(领域驱动设计速成)》•007:全部•008:加技术讨论群