ReplyingKafkaTemplate源码分析

2023-10-16 19:57:47 浏览数 (2)

代码语言:javascript复制
ReplyingKafkaTemplate是spring-kafka组件提供的一个用于实现请求响应模式的类,基础介绍可以参考文章https://blog.csdn.net/john1337/article/details/131363690,这里不再赘述这些细节,下面看下ReplyingKafkaTemplate是如何实现请求响应模式的。

代码语言:javascript复制
	@Override
	public void afterPropertiesSet() {
		if (!this.schedulerSet && !this.schedulerInitialized) {
            // 初始化ThreadPoolTaskScheduler
			((ThreadPoolTaskScheduler) this.scheduler).initialize();
			this.schedulerInitialized = true;
		}
	}

	@Override
	public synchronized void start() {
		if (!this.running) {
			try {
				afterPropertiesSet();
			}
			catch (Exception e) {
				throw new KafkaException("Failed to initialize", e);
			}
            // 启动GenericMessageListenerContainer,用于后续接收响应数据
			this.replyContainer.start();
			this.running = true;
		}
	}

上面为该类初始化部分,下面看下数据发送接口:

代码语言:javascript复制
	public RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record, @Nullable Duration replyTimeout) {
		Assert.state(this.running, "Template has not been start()ed"); // NOSONAR (sync)
		// correlationId是本次消息的唯一标识,实际上就是一个random 的uuid组成,这部分代码就不再列出,感兴趣的可以查看ReplyingKafkaTemplate类defaultCorrelationIdStrategy方法
        CorrelationKey correlationId = this.correlationStrategy.apply(record);
		Assert.notNull(correlationId, "the created 'correlationId' cannot be null");
		Headers headers = record.headers();
		boolean hasReplyTopic = headers.lastHeader(KafkaHeaders.REPLY_TOPIC) != null;
		if (!hasReplyTopic && this.replyTopic != null) {
			headers.add(new RecordHeader(this.replyTopicHeaderName, this.replyTopic));
			if (this.replyPartition != null) {
				headers.add(new RecordHeader(this.replyPartitionHeaderName, this.replyPartition));
			}
		}
		headers.add(new RecordHeader(this.correlationHeaderName, correlationId.getCorrelationId()));
		this.logger.debug(() -> "Sending: "   record   WITH_CORRELATION_ID   correlationId);
        // 创建RequestReplyFuture,并存入futures中
		RequestReplyFuture<K, V, R> future = new RequestReplyFuture<>();
		this.futures.put(correlationId, future);
		try {
			future.setSendFuture(send(record));
		}
		catch (Exception e) {
			this.futures.remove(correlationId);
			throw new KafkaException("Send failed", e);
		}
        // 启动定时检测,指定时间结束后及时没收到反馈,也结束等待并返回KafkaReplyTimeoutException超时异常
		scheduleTimeout(record, correlationId, replyTimeout == null ? this.defaultReplyTimeout : replyTimeout);
		return future;
	}
代码语言:javascript复制
	private void scheduleTimeout(ProducerRecord<K, V> record, CorrelationKey correlationId, Duration replyTimeout) {
        // 超时检测部分代码
		this.scheduler.schedule(() -> {
			RequestReplyFuture<K, V, R> removed = this.futures.remove(correlationId);
			if (removed != null) {
				this.logger.warn(() -> "Reply timed out for: "   record   WITH_CORRELATION_ID   correlationId);
				if (!handleTimeout(correlationId, removed)) {
					removed.setException(new KafkaReplyTimeoutException("Reply timed out"));
				}
			}
		}, Instant.now().plus(replyTimeout));
	}

上面是超时检测部分超时情况下反馈,下面看下正常接收到响应时代码:

代码语言:javascript复制
	public void onMessage(List<ConsumerRecord<K, R>> data) {
		data.forEach(record -> {
            // 获取kafka_correlationId请求头
			Header correlationHeader = record.headers().lastHeader(this.correlationHeaderName);
			CorrelationKey correlationId = null;
			if (correlationHeader != null) {
				correlationId = new CorrelationKey(correlationHeader.value());
			}
			if (correlationId == null) {
				this.logger.error(() -> "No correlationId found in reply: "   record
						  " - to use request/reply semantics, the responding server must return the correlation id "
						  " in the '"   this.correlationHeaderName   "' header");
			}
			else {
                // 收到对应反馈数据,这里需要判断是否该数据在来到之前已被超时检测删除
				RequestReplyFuture<K, V, R> future = this.futures.remove(correlationId);
				CorrelationKey correlationKey = correlationId;
				if (future == null) {
                     // 数据已被超时检测机制删除,记录日志
					logLateArrival(record, correlationId);
				}
				else {
					this.logger.debug(() -> "Received: "   record   WITH_CORRELATION_ID   correlationKey);
                     // 将反馈返回
					future.set(record);
				}
			}
		});
	}

示例代码可以参考:

https://cloud.tencent.com/developer/article/2343889

0 人点赞