引入依赖
代码语言:javascript复制<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
或
代码语言:javascript复制<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
发送(Spring Kafka)
代码语言:javascript复制private KafkaTemplate<String, Object> kafkaTemplate;
@Autowired
public KafkaController(KafkaTemplate kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@GetMapping("/send")
public Person send(@PathVariable String name) {
Person person = new Person();
person.setId(System.currentTimeMillis());
person.setName(name);
kafkaTemplate.send("test-topic", person);
return person;
}
接收(Spring Kafka)
代码语言:javascript复制 @KafkaListener(topics = "test-topic")
public void consume(Person person){
System.out.println(person.toString());
}
//生产者端错误信息 There was an unexpected error (type=Internal Server Error, status=500). Can't convert value of class com.service.Person to class org.apache.kafka.common.serialization.ByteArraySerializer specified in value.serializer
消费者端错误信息 nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [com.service.Person]
KafkaProperties-> Consumer->valueDeserializer
// 解决办法 KafkaProperties-> Producer->valueSerializer
代码语言:javascript复制spring:
kafka:
producer:
valueSerializer: com.service.kafka.ObjectSerializer #加入自定义序列化方式
consumer:
groupId: test
valueDeserializer: com.service.kafka.ObjectDeSerializer
代码语言:javascript复制public class ObjectSerializer implements Serializer<Serializable> {
@Override
public void configure(Map<String, ?> map, boolean b) {
}
@Override
public byte[] serialize(String s, Serializable serializable) {
System.out.printf("topic:%s, data:%s", s, serializable);
ByteArrayOutputStream bos = new ByteArrayOutputStream();
byte[] bytes = null;
try (ObjectOutputStream oos = new ObjectOutputStream(bos)) {
oos.writeObject(serializable);
bytes = bos.toByteArray();
} catch (IOException e) {
e.printStackTrace();
}
return bytes;
}
@Override
public void close() {
}
}
public class ObjectDeSerializer implements Deserializer<Serializable> {
@Override
public void configure(Map map, boolean b) {
}
@Override
public Serializable deserialize(String s, byte[] bytes) {
ByteArrayInputStream bs = new ByteArrayInputStream(bytes);
Serializable result = null;
try (ObjectInputStream os = new ObjectInputStream(bs)) {
result = (Serializable) os.readObject();
} catch (IOException | ClassNotFoundException e) {
e.printStackTrace();
}
System.out.printf("topic:%s, data:%s", s, result);
return result;
}
@Override
public void close() {
}
}
发送(Spring Cloud Stream Kafka)
代码语言:javascript复制@GetMapping("/stream/{name}")
public Person streamSend(@PathVariable String name){
Person person = new Person();
person.setId(System.currentTimeMillis());
person.setName(name);
MessageChannel mc = source.output();
mc.send(MessageBuilder.withPayload(person).build());
return person;
}
自定义source
代码语言:javascript复制public interface PersonSource {
/**
* Name of the output channel.
*/
String TOPIC = "test-topic";
/**
* @return output channel
*/
@Output(PersonSource.TOPIC)
MessageChannel source();
}
代码语言:javascript复制// 加入注解
@EnableBinding(value = {Source.class,PersonSource.class})
// 将source替换为新定义的personSource
MessageChannel mc = personSource.source();
消费
代码语言:javascript复制// 使用如下方式会报错
@KafkaListener(topics = "test-topic")
public void consume(Person person){
System.out.println(person.toString());
}
代码语言:javascript复制// 如下方式正常
@StreamListener("test-topic")
public void streamConsumer(Person person){
System.out.println(person.toString());
}
是否能通过给数据加入Header的方式解决问题
代码语言:javascript复制mc.send(MessageBuilder.withPayload(person).setHeader("Content-Type","application/bean").build());
通过加入header的方式依然不能反序列化成功.
注意
- 虽然Spring Cloud Stream Binder 中存在Spring Kafka的整合,但是Spring Kafka和Spring Cloud Stream Kafka在处理数据的生产与消费是存在差异的。所以在使用上一定要配套使用。
- 当Spring Cloud Stream Kafka 发送消息包含头信息时,Kafka DeSerializer在实现方法回调的时候并不会处理。
- 一般情况可以通过StreamListener 来监听数据(主体),如果需要处理消息的header信息的话可以通过SubscribableChannel来处理
@Bean
public ApplicationRunner createRunner() {
return (args) -> personSink.input().subscribe(message -> {
MessageHeaders headers = message.getHeaders();
Object obj = message.getPayload();
System.out.printf("receive message, header:%s, body:%s", headers, obj);
});
}
代码语言:javascript复制但是如果上述代码与如下代码同时存在,那么他们会轮流执行
@StreamListener("test-topic")
public void streamConsumer(Person person){
System.out.println(person.toString());
}
Input注解
- 对应 - SubscribableChannel
Output注解
- 对应 - MessageChannel
代码语言:javascript复制两者均屏蔽了具体Stream的具体实现。 无论是
@Input
还是@Output
他们的value
不允许重复(bean
不允许重复),可以通过destination
来申明topic
spring:
cloud:
stream:
bindings:
test-topic-provider:
destination: test-topic
test-topic-consume:
group: test02
destination: test-topic
代码语言:javascript复制 /**
* Name of the output channel.
*/
String TOPIC = "test-topic-provider";
/**
* @return output channel
*/
@Output(PersonSource.TOPIC)
MessageChannel source();
代码语言:javascript复制 /**
* Input channel name.
*/
String INPUT = "test-topic-consume";
/**
* @return input channel.
*/
@Input(INPUT)
SubscribableChannel input();
代码语言:javascript复制@StreamListener(PersonSource.TOPIC)
public void streamConsumer(Person person){
System.out.println(person.toString());
}
SubscribableChannel与@StreamListener
两者实现存在着差异,SubscribableChannel会触发kafka的自定义反序列化,所以Spring Cloud Stream Kafka 是将对象序列化成JSON, 通过JSON反序列化成对象(不经过自定义kafka的Serializer/DeSerializer)。