SCS 在 3.x 做了很大的改动,废除了诸如 @StreamListener、@Input、@Output 等类,保留了 Binder、Binding,并提供了批量消费的支持。 本着学新不学旧的原则,本文将介绍 SCS 3.x 相关内容。 由于关于 spring cloud stream kafka 的文档比较充足,本文就此为例介绍 SCS。
Binder 是提供与外部消息中间件集成的组件,为 Binding 提供了 2 个方法,分别是 bindConsumer 和 bindProducer,它们用于构造生产者和消费者。 Binding 是连接应用程序跟消息中间件的桥梁,用于消息的消费和生产。
Binder 事务
不要在事务中尝试重试和提交死信。重试时,事务可能已经回归。如果想要提交死信用于善后,那么可以使用 DefaultAfterRollbackProcessor
以在回滚之后提交死信。
Error Channel
binder 会使用 Error Channel 向消费者传递异常,同时可以配置异步生产者发生异常时将异常传递到 Error Channel。
Dead-Letter
默认情况下,某 topic 的死信队列将与原始记录存在于相同分区中。
死信队列中的消息是允许复活的,但是应该避免消息反复消费失败导致多次循环进入死信队列。
应该使用一个专门的处理程序用来对这些死信队列的信息进行善后。
Consumer 消费者
顾名思义,Consumer 定义的是一个消费者,他是一个函数式接口,提供了消费消息的方法。我们可以直接在 Bean 声明中使用 lambda 表达式实现它。
代码语言:javascript复制值得注意的是,Consumer 还是一个泛型接口,通过泛型来绑定消息的类型。接收消息的类型我们会用到 KStream 类,他将与发送消息时定义的 KStream 对应,是键值对组成的抽象记录流,但相同 key 的记录不会被覆盖。
@Bean
public Consumer<KStream<Object, String>> consumer() {
return input -> input.foreach((key, value) -> {
do consume;
});
}
当我们在应用程序中声明返回 Consumer 的 Bean,那么这个 Bean 就会自动接入消息队列。另外,我们需要用到 spring.cloud.stream.bindings.{beanName}-in-{idx}={topic}
来设置订阅的消息主题。默认情况下,topic 与 beanName 同名。
spring.cloud.stream.bindings.consumer-in-0 = userBuy
当接收到消息时,就会调用 Consumer 定义的 accept 方法进行消息消费。
发送消息 生产者
SCS 并没有对发送消息做出一个具体的封装,而是建议通过各个消息队列支持的 client 或者 template 发送消息。
代码语言:javascript复制 kafkaTemplate.send(message);
Function 加工厂
但有时候,我们需要对数据进行加工后发送回消息队列中,这个时候就会用到 Function。
它和 Consumer 类似,但是方法多了一个返回值。同样的,这个返回值需要用到 KStream 类,这样就能够支持将处理完的数据返回到消息队列。
代码语言:javascript复制@Bean
public Function<KStream<Object, String>, KStream<Object, Stream>> processor() {
return input -> input.map((key, value) -> {
do process;
return new KeyValue(key, value);
})
}
spring.cloud.stream.bindings.{beanName}-out-{idx}={topic}
来设置出口的消息主题。默认情况下,topic 与 beanName 同名。
Function 相比生产者或消费者,更像是将消息进行加工,这个过程可以对消息进行一系列的处理,包括消息拆分,消息过滤和计算中间结果等。常见的一个用途就是国际化消息和多平台通知。
国际化消息就是对消息进行本地化,Function 就类似一个翻译官的功能,将翻译好的消息转达给消费者。
有时候我们也需要同时对多个平台推送通知,比如邮件、短信等。一般来说,邮件服务器和短信服务器不会写死消息的模板以提高泛用性,这个时候就需要中间人对消息进行加工,嵌入对应平台的模板。
多输出绑定
上面提到了消息拆分,Function 允许多个 topic 的消息发送,返回值上会用到 KStream 数组,然后配置上会用到方才展示的 spring.cloud.stream.bindings.{beanName}-out-{idx}={topic}
,idx 代表的就是返回值 KStream 在数组中的索引。
多输入绑定
多输入绑定在普通应用程序上很少用到,一般用于分布式计算。比如除法计算需要同时拥有除数和被除数。分布式计算也是 SCS 的一大用处之一,知识盲区,在此不多做介绍。
KStream
上面多次提到了 KStream,它实质上是一个顺序且可不断增长的数据集,是数据流的一种。
KTable
KTable 与 KStream 类似,但是与 KStream 不同的是,他不允许 key 的重复。 面对相同 key 的数据,会选择更新而不是插入。 KTable 实质上也是数据流,他的实现类同样继承了 AbstractStream。 可以将他看成某一时刻,KStream 的最新快照。