代码语言:javascript复制记录,避免重复造轮子。
@Service
@Slf4j
public class KafkaCommonProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
/**
* 发送消息
* @param topic topic
* @param t 消息
* @param <T> 消息类型
* @return 发送结果
*/
public <T> ListenableFuture<SendResult<String, String>> send(String topic, T t) {
final String message = JSON.toJSONString(t);
return kafkaTemplate.send(topic, message);
}
/**
* 发送消息
* @param topic topic
* @param key key
* @param t 消息
* @param <T> 消息类型
* @return 发送结果
*/
public <T> ListenableFuture<SendResult<String, String>> send(String topic, String key, T t) {
final String message = JSON.toJSONString(t);
return kafkaTemplate.send(topic, key, message);
}
}
如果需要回调则可以
代码语言:javascript复制 public void send(String warningMessage) {
log.info(">>>>> Kafka消息发送,topic: {}, Key: {}, message: {}", TOPIC_NAME, TOPIC_NAME, warningMessage);
ListenableFuture<SendResult<String, String>> future = kafkaCommonProducer.send(TOPIC_NAME, TOPIC_NAME, warningMessage);
future.addCallback(
success -> log.info(">>>>> Kafka消息发送成功,{}", success.toString()),
failure -> log.info(">>>>> Kafka消息发送失败,{}", failure.getMessage())
);
}
application.yml 配置如下:
代码语言:javascript复制spring:
application:
name: test-kafka-msg
kafka:
bootstrap-servers: localhost:20902
############如果采用SASL认证的话需要添加以下内容
producer:
properties:
sasl.mechanism: SCRAM-SHA-256
security.protocol: SASL_PLAINTEXT
sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="test" password="test";
Copyright: 采用 知识共享署名4.0 国际许可协议进行许可 Links: https://cloud.tencent.com/developer/article/2020541