代码语言:javascript复制
ReplyingKafkaTemplate是spring-kafka组件提供的一个用于实现请求响应模式的类,基础介绍可以参考文章https://blog.csdn.net/john1337/article/details/131363690,这里不再赘述这些细节,下面看下ReplyingKafkaTemplate是如何实现请求响应模式的。
代码语言:javascript复制 @Override
public void afterPropertiesSet() {
if (!this.schedulerSet && !this.schedulerInitialized) {
// 初始化ThreadPoolTaskScheduler
((ThreadPoolTaskScheduler) this.scheduler).initialize();
this.schedulerInitialized = true;
}
}
@Override
public synchronized void start() {
if (!this.running) {
try {
afterPropertiesSet();
}
catch (Exception e) {
throw new KafkaException("Failed to initialize", e);
}
// 启动GenericMessageListenerContainer,用于后续接收响应数据
this.replyContainer.start();
this.running = true;
}
}
上面为该类初始化部分,下面看下数据发送接口:
代码语言:javascript复制 public RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record, @Nullable Duration replyTimeout) {
Assert.state(this.running, "Template has not been start()ed"); // NOSONAR (sync)
// correlationId是本次消息的唯一标识,实际上就是一个random 的uuid组成,这部分代码就不再列出,感兴趣的可以查看ReplyingKafkaTemplate类defaultCorrelationIdStrategy方法
CorrelationKey correlationId = this.correlationStrategy.apply(record);
Assert.notNull(correlationId, "the created 'correlationId' cannot be null");
Headers headers = record.headers();
boolean hasReplyTopic = headers.lastHeader(KafkaHeaders.REPLY_TOPIC) != null;
if (!hasReplyTopic && this.replyTopic != null) {
headers.add(new RecordHeader(this.replyTopicHeaderName, this.replyTopic));
if (this.replyPartition != null) {
headers.add(new RecordHeader(this.replyPartitionHeaderName, this.replyPartition));
}
}
headers.add(new RecordHeader(this.correlationHeaderName, correlationId.getCorrelationId()));
this.logger.debug(() -> "Sending: " record WITH_CORRELATION_ID correlationId);
// 创建RequestReplyFuture,并存入futures中
RequestReplyFuture<K, V, R> future = new RequestReplyFuture<>();
this.futures.put(correlationId, future);
try {
future.setSendFuture(send(record));
}
catch (Exception e) {
this.futures.remove(correlationId);
throw new KafkaException("Send failed", e);
}
// 启动定时检测,指定时间结束后及时没收到反馈,也结束等待并返回KafkaReplyTimeoutException超时异常
scheduleTimeout(record, correlationId, replyTimeout == null ? this.defaultReplyTimeout : replyTimeout);
return future;
}
代码语言:javascript复制 private void scheduleTimeout(ProducerRecord<K, V> record, CorrelationKey correlationId, Duration replyTimeout) {
// 超时检测部分代码
this.scheduler.schedule(() -> {
RequestReplyFuture<K, V, R> removed = this.futures.remove(correlationId);
if (removed != null) {
this.logger.warn(() -> "Reply timed out for: " record WITH_CORRELATION_ID correlationId);
if (!handleTimeout(correlationId, removed)) {
removed.setException(new KafkaReplyTimeoutException("Reply timed out"));
}
}
}, Instant.now().plus(replyTimeout));
}
上面是超时检测部分超时情况下反馈,下面看下正常接收到响应时代码:
代码语言:javascript复制 public void onMessage(List<ConsumerRecord<K, R>> data) {
data.forEach(record -> {
// 获取kafka_correlationId请求头
Header correlationHeader = record.headers().lastHeader(this.correlationHeaderName);
CorrelationKey correlationId = null;
if (correlationHeader != null) {
correlationId = new CorrelationKey(correlationHeader.value());
}
if (correlationId == null) {
this.logger.error(() -> "No correlationId found in reply: " record
" - to use request/reply semantics, the responding server must return the correlation id "
" in the '" this.correlationHeaderName "' header");
}
else {
// 收到对应反馈数据,这里需要判断是否该数据在来到之前已被超时检测删除
RequestReplyFuture<K, V, R> future = this.futures.remove(correlationId);
CorrelationKey correlationKey = correlationId;
if (future == null) {
// 数据已被超时检测机制删除,记录日志
logLateArrival(record, correlationId);
}
else {
this.logger.debug(() -> "Received: " record WITH_CORRELATION_ID correlationKey);
// 将反馈返回
future.set(record);
}
}
});
}
示例代码可以参考:
https://cloud.tencent.com/developer/article/2343889