场景
使用Spring Cloud Stream 1.3.2.RELEASE向Kafka发布String消息。 当使用命令行Kafka使用者或Spring Kafka @KafkaListener
使用消息时,contentType标头始终附加到消息正文
kafka生产者,Spring Cloud Stream as producer
代码语言:javascript复制private void send() {
channel.test().send(MessageBuilder.withPayload("{"foo":"bar"}").build());
}
kafka消费者,Spring Kafka as consumer
代码语言:javascript复制@KafkaListener(topics = "test")
public void receive(Message message){
log.info("Message payload received: {}", message.getPayload());
}
接收日志
代码语言:javascript复制2018-05-16 07:12:05.241 INFO 19475 --- [ntainer#0-0-C-1] com.demo.service.Listener : Message payload received: �contentType"text/plain"{"foo":"bar"}
代码语言:javascript复制2020-09-21 15:15:00.092 INFO [ASYNC_USER_OPERATE][KafkaFetchThread.java:54]: KafkaFetchThread poll message size: 1
2020-09-21 15:15:00.092 INFO [ASYNC_USER_OPERATE][KafkaFetchThread.java:59]: Fetch Kafka Msg : topic=[ASYNC_USER_OPERATE] partition=[2] offset=[0] key=[null] value=[�contentType"application/json"{"blankTask":false,"persistTrace":true,"sendTime":0,"taskDesc":"列表导出","taskParams":{"allNumber":4714}}]
2020-09-21 15:15:00.094 ERROR [ASYNC_USER_OPERATE][MQFetchThread.java:177]: MQFetchThread.parseTask param : �contentType"application/json"{"blankTask":false,"persistTrace":true,"sendTime":0,"taskDesc":"列表导出","taskParams":{"allNumber":4714}]
配置
生产者和消费者的headerMode
的默认配置为embeddedHeaders,
headerMode设置为raw
时,禁用输出上的标题嵌入。仅适用于不支持消息头的消息中间件,并且需要头部嵌入。在非Spring Cloud Stream应用程序生成数据时很有用。
spring:
cloud:
stream:
default:
producer:
# 禁用输出上的标题嵌入,none
headermode: raw
consumer:
headermode: embeddedHeaders
kafka:
binder:
brokers: kafka:9092
参考
1、在Spring Cloud Stream消息主体中找到嵌入的标头(Embedded headers found in Spring Cloud Stream message body):https://m.656463.com/wenda/zSpringCloudStreamxxztzzdqrdbt_351
2、Spring Cloud Stream Kafka是否支持嵌入式标头?:https://www.javaroad.cn/questions/326728
3、Spring Cloud Stream集成kafka问题 - 消费者接收数据异常:https://www.jianshu.com/p/ac0aeab4550f
4、Spring Cloud Stream-配置选项:https://blog.csdn.net/u010277958/article/details/94083714
5、Spring Cloud Alibaba集成Kafka遇到的坑导致传递对象,消费者读消息内容为空的解决方案:https://blog.csdn.net/bufegar0/article/details/108416509
6、Spring Cloud中通过Kafka传递自定义Header:https://www.cnblogs.com/bobdeng/p/8759351.html