Kafka消费者接收数据异常,contentType标头始终附加到消息正文

2021-09-06 10:25:14 浏览数 (1)

场景

使用Spring Cloud Stream 1.3.2.RELEASE向Kafka发布String消息。 当使用命令行Kafka使用者或Spring Kafka @KafkaListener使用消息时,contentType标头始终附加到消息正文

kafka生产者,Spring Cloud Stream as producer

代码语言:javascript复制
private void send() {
    channel.test().send(MessageBuilder.withPayload("{"foo":"bar"}").build());
}

kafka消费者,Spring Kafka as consumer

代码语言:javascript复制
@KafkaListener(topics = "test")
public void receive(Message message){
    log.info("Message payload received: {}", message.getPayload());
}

接收日志

代码语言:javascript复制
2018-05-16 07:12:05.241  INFO 19475 --- [ntainer#0-0-C-1] com.demo.service.Listener  : Message payload received: �contentType"text/plain"{"foo":"bar"}
代码语言:javascript复制
2020-09-21 15:15:00.092 INFO [ASYNC_USER_OPERATE][KafkaFetchThread.java:54]: KafkaFetchThread poll message size: 1

2020-09-21 15:15:00.092 INFO [ASYNC_USER_OPERATE][KafkaFetchThread.java:59]: Fetch Kafka Msg : topic=[ASYNC_USER_OPERATE] partition=[2] offset=[0] key=[null] value=[�contentType"application/json"{"blankTask":false,"persistTrace":true,"sendTime":0,"taskDesc":"列表导出","taskParams":{"allNumber":4714}}]

2020-09-21 15:15:00.094 ERROR [ASYNC_USER_OPERATE][MQFetchThread.java:177]: MQFetchThread.parseTask param : �contentType"application/json"{"blankTask":false,"persistTrace":true,"sendTime":0,"taskDesc":"列表导出","taskParams":{"allNumber":4714}]

配置

生产者和消费者的headerMode的默认配置为embeddedHeaders,headerMode设置为raw时,禁用输出上的标题嵌入。仅适用于不支持消息头的消息中间件,并且需要头部嵌入。在非Spring Cloud Stream应用程序生成数据时很有用。

代码语言:javascript复制
spring:
  cloud:
    stream:
      default:
        producer:
          # 禁用输出上的标题嵌入,none
          headermode: raw
        consumer:
          headermode: embeddedHeaders
      kafka:
        binder:
          brokers: kafka:9092

参考

1、在Spring Cloud Stream消息主体中找到嵌入的标头(Embedded headers found in Spring Cloud Stream message body):https://m.656463.com/wenda/zSpringCloudStreamxxztzzdqrdbt_351

2、Spring Cloud Stream Kafka是否支持嵌入式标头?:https://www.javaroad.cn/questions/326728

3、Spring Cloud Stream集成kafka问题 - 消费者接收数据异常:https://www.jianshu.com/p/ac0aeab4550f

4、Spring Cloud Stream-配置选项:https://blog.csdn.net/u010277958/article/details/94083714

5、Spring Cloud Alibaba集成Kafka遇到的坑导致传递对象,消费者读消息内容为空的解决方案:https://blog.csdn.net/bufegar0/article/details/108416509

6、Spring Cloud中通过Kafka传递自定义Header:https://www.cnblogs.com/bobdeng/p/8759351.html

0 人点赞