聊聊rocketmq5的RocketMQClientTemplate

2024-07-27 12:25:58 浏览数 (1)

本文主要研究一下rocketmq5的RocketMQClientTemplate

rocketmq-spring

自从rocketmq推出了5版本之后,rocketmq-spring就区分了4.x与5.x版本,4.x版本是rocketmq-spring开头,5.x版本是rocketmq-v5-client开头;rocketmq5的RocketMQClientTemplate对应的是旧版的RocketMQTemplate,他们都同样继承了org.springframework.messaging.core.AbstractMessageSendingTemplate,只是各自扩展的方法发生的变化。

RocketMQTemplate

主要是:

  • sendAndReceive
  • syncSend、syncSendDelayTimeSeconds、syncSendDelayTimeMills、syncSendDeliverTimeMills、syncSendOrderly
  • asyncSend、asyncSendOrderly
  • sendOneWay、sendOneWayOrderly
  • sendMessageInTransaction

RocketMQClientTemplate

主要是:

  • syncSendNormalMessage、syncSendFifoMessage、syncSendDelayMessage
  • asyncSendNormalMessage、asyncSendFifoMessage、asyncSendDelayMessage
  • asyncSendWithObjectPayload、asyncSendWithStringPayload、asyncSendWithBytePayload、asyncSendWithMessagePayload
  • sendMessageInTransaction
  • receive、receiveAsync
  • ack、ackAsync

MessageSendingOperations

org/springframework/messaging/core/MessageSendingOperations.java

代码语言:javascript复制
public interface MessageSendingOperations<D> {

  /**
   * Send a message to a default destination.
   * @param message the message to send
   */
  void send(Message<?> message) throws MessagingException;

  /**
   * Send a message to the given destination.
   * @param destination the target destination
   * @param message the message to send
   */
  void send(D destination, Message<?> message) throws MessagingException;

  /**
   * Convert the given Object to serialized form, possibly using a
   * {@link org.springframework.messaging.converter.MessageConverter},
   * wrap it as a message and send it to a default destination.
   * @param payload the Object to use as payload
   */
  void convertAndSend(Object payload) throws MessagingException;

  /**
   * Convert the given Object to serialized form, possibly using a
   * {@link org.springframework.messaging.converter.MessageConverter},
   * wrap it as a message and send it to the given destination.
   * @param destination the target destination
   * @param payload the Object to use as payload
   */
  void convertAndSend(D destination, Object payload) throws MessagingException;

  /**
   * Convert the given Object to serialized form, possibly using a
   * {@link org.springframework.messaging.converter.MessageConverter},
   * wrap it as a message with the given headers and send it to
   * the given destination.
   * @param destination the target destination
   * @param payload the Object to use as payload
   * @param headers the headers for the message to send
   */
  void convertAndSend(D destination, Object payload, Map<String, Object> headers) throws MessagingException;

  /**
   * Convert the given Object to serialized form, possibly using a
   * {@link org.springframework.messaging.converter.MessageConverter},
   * wrap it as a message, apply the given post processor, and send
   * the resulting message to a default destination.
   * @param payload the Object to use as payload
   * @param postProcessor the post processor to apply to the message
   */
  void convertAndSend(Object payload, @Nullable MessagePostProcessor postProcessor) throws MessagingException;

  /**
   * Convert the given Object to serialized form, possibly using a
   * {@link org.springframework.messaging.converter.MessageConverter},
   * wrap it as a message, apply the given post processor, and send
   * the resulting message to the given destination.
   * @param destination the target destination
   * @param payload the Object to use as payload
   * @param postProcessor the post processor to apply to the message
   */
  void convertAndSend(D destination, Object payload, MessagePostProcessor postProcessor) throws MessagingException;

  /**
   * Convert the given Object to serialized form, possibly using a
   * {@link org.springframework.messaging.converter.MessageConverter},
   * wrap it as a message with the given headers, apply the given post processor,
   * and send the resulting message to the given destination.
   * @param destination the target destination
   * @param payload the Object to use as payload
   * @param headers the headers for the message to send
   * @param postProcessor the post processor to apply to the message
   */
  void convertAndSend(D destination, Object payload, @Nullable Map<String, Object> headers,
      @Nullable MessagePostProcessor postProcessor) throws MessagingException;

}

spring-messaging的MessageSendingOperations接口定义了send、convertAndSend方法

AbstractMessageSendingTemplate

org/springframework/messaging/core/AbstractMessageSendingTemplate.java

代码语言:javascript复制
public abstract class AbstractMessageSendingTemplate<D> implements MessageSendingOperations<D> {

  /**
   * Name of the header that can be set to provide further information
   * (e.g. a {@code MethodParameter} instance) about the origin of the
   * payload, to be taken into account as a conversion hint.
   * @since 4.2
   */
  public static final String CONVERSION_HINT_HEADER = "conversionHint";


  protected final Log logger = LogFactory.getLog(getClass());

  @Nullable
  private D defaultDestination;

  private MessageConverter converter = new SimpleMessageConverter();


  /**
   * Configure the default destination to use in send methods that don't have
   * a destination argument. If a default destination is not configured, send methods
   * without a destination argument will raise an exception if invoked.
   */
  public void setDefaultDestination(@Nullable D defaultDestination) {
    this.defaultDestination = defaultDestination;
  }

  /**
   * Return the configured default destination.
   */
  @Nullable
  public D getDefaultDestination() {
    return this.defaultDestination;
  }

  /**
   * Set the {@link MessageConverter} to use in {@code convertAndSend} methods.
   * <p>By default, {@link SimpleMessageConverter} is used.
   * @param messageConverter the message converter to use
   */
  public void setMessageConverter(MessageConverter messageConverter) {
    Assert.notNull(messageConverter, "MessageConverter must not be null");
    this.converter = messageConverter;
  }

  /**
   * Return the configured {@link MessageConverter}.
   */
  public MessageConverter getMessageConverter() {
    return this.converter;
  }


  @Override
  public void send(Message<?> message) {
    send(getRequiredDefaultDestination(), message);
  }

  protected final D getRequiredDefaultDestination() {
    Assert.state(this.defaultDestination != null, "No 'defaultDestination' configured");
    return this.defaultDestination;
  }

  @Override
  public void send(D destination, Message<?> message) {
    doSend(destination, message);
  }

  protected abstract void doSend(D destination, Message<?> message);


  @Override
  public void convertAndSend(Object payload) throws MessagingException {
    convertAndSend(payload, null);
  }

  @Override
  public void convertAndSend(D destination, Object payload) throws MessagingException {
    convertAndSend(destination, payload, (Map<String, Object>) null);
  }

  @Override
  public void convertAndSend(D destination, Object payload, @Nullable Map<String, Object> headers)
      throws MessagingException {

    convertAndSend(destination, payload, headers, null);
  }

  @Override
  public void convertAndSend(Object payload, @Nullable MessagePostProcessor postProcessor)
      throws MessagingException {

    convertAndSend(getRequiredDefaultDestination(), payload, postProcessor);
  }

  @Override
  public void convertAndSend(D destination, Object payload, @Nullable MessagePostProcessor postProcessor)
      throws MessagingException {

    convertAndSend(destination, payload, null, postProcessor);
  }

  @Override
  public void convertAndSend(D destination, Object payload, @Nullable Map<String, Object> headers,
      @Nullable MessagePostProcessor postProcessor) throws MessagingException {

    Message<?> message = doConvert(payload, headers, postProcessor);
    send(destination, message);
  }

  /**
   * Convert the given Object to serialized form, possibly using a
   * {@link MessageConverter}, wrap it as a message with the given
   * headers and apply the given post processor.
   * @param payload the Object to use as payload
   * @param headers the headers for the message to send
   * @param postProcessor the post processor to apply to the message
   * @return the converted message
   */
  protected Message<?> doConvert(Object payload, @Nullable Map<String, Object> headers,
      @Nullable MessagePostProcessor postProcessor) {

    MessageHeaders messageHeaders = null;
    Object conversionHint = (headers != null ? headers.get(CONVERSION_HINT_HEADER) : null);

    Map<String, Object> headersToUse = processHeadersToSend(headers);
    if (headersToUse != null) {
      if (headersToUse instanceof MessageHeaders) {
        messageHeaders = (MessageHeaders) headersToUse;
      }
      else {
        messageHeaders = new MessageHeaders(headersToUse);
      }
    }

    MessageConverter converter = getMessageConverter();
    Message<?> message = (converter instanceof SmartMessageConverter ?
        ((SmartMessageConverter) converter).toMessage(payload, messageHeaders, conversionHint) :
        converter.toMessage(payload, messageHeaders));
    if (message == null) {
      String payloadType = payload.getClass().getName();
      Object contentType = (messageHeaders != null ? messageHeaders.get(MessageHeaders.CONTENT_TYPE) : null);
      throw new MessageConversionException("Unable to convert payload with type='"   payloadType  
          "', contentType='"   contentType   "', converter=["   getMessageConverter()   "]");
    }
    if (postProcessor != null) {
      message = postProcessor.postProcessMessage(message);
    }
    return message;
  }

  /**
   * Provides access to the map of input headers before a send operation.
   * Subclasses can modify the headers and then return the same or a different map.
   * <p>This default implementation in this class returns the input map.
   * @param headers the headers to send (or {@code null} if none)
   * @return the actual headers to send (or {@code null} if none)
   */
  @Nullable
  protected Map<String, Object> processHeadersToSend(@Nullable Map<String, Object> headers) {
    return headers;
  }

}

AbstractMessageSendingTemplate对MessageSendingOperations的接口做了实现,它定义了doSend(D destination, Message<?> message)需要子类实现

RocketMQClientTemplate

rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/core/RocketMQClientTemplate.java

代码语言:javascript复制
@SuppressWarnings({"WeakerAccess", "unused"})
public class RocketMQClientTemplate extends AbstractMessageSendingTemplate<String> implements DisposableBean {

    private static final Logger log = LoggerFactory.getLogger(RocketMQClientTemplate.class);

    private ProducerBuilder producerBuilder;

    private SimpleConsumerBuilder simpleConsumerBuilder;

    private volatile Producer producer;

    private volatile SimpleConsumer simpleConsumer;

    private RocketMQMessageConverter rocketMQMessageConverter = new RocketMQMessageConverter();

    private String charset = "UTF-8";

    @Override
    protected void doSend(String destination, Message<?> message) {
        SendReceipt sendReceipt = syncSendGrpcMessage(destination, message, null, null);
        if (log.isDebugEnabled()) {
            log.debug("send message to `{}` finished. result:{}", destination, sendReceipt);
        }
    }

    /**
     * @param destination      formats: `topicName:tags`
     * @param message          {@link Message} the message to be sent.
     * @param messageDelayTime Time for message delay
     * @param messageGroup     message group name
     * @return SendReceipt Synchronous Task Results
     */
    public SendReceipt syncSendGrpcMessage(String destination, Message<?> message, Duration messageDelayTime, String messageGroup) {
        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
            log.error("send request message failed. destination:{}, message is null ", destination);
            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
        }
        SendReceipt sendReceipt = null;
        try {
            org.apache.rocketmq.client.apis.message.Message rocketMsg = this.createRocketMQMessage(destination, message, messageDelayTime, messageGroup);
            Producer grpcProducer = this.getProducer();
            try {
                sendReceipt = grpcProducer.send(rocketMsg);
                log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
            } catch (Throwable t) {
                log.error("Failed to send message", t);
            }
        } catch (Exception e) {
            log.error("send request message failed. destination:{}, message:{} ", destination, message);
            throw new MessagingException(e.getMessage(), e);
        }
        return sendReceipt;
    }    

    private org.apache.rocketmq.client.apis.message.Message createRocketMQMessage(String destination, Message<?> message, Duration messageDelayTime, String messageGroup) {
        Message<?> msg = this.doConvert(message.getPayload(), message.getHeaders(), null);
        return RocketMQUtil.convertToClientMessage(getMessageConverter(), charset,
                destination, msg, messageDelayTime, messageGroup);
    }    

    //......
}

RocketMQClientTemplate继承了AbstractMessageSendingTemplate,其doSend方法调用的是syncSendGrpcMessage;该方法主要是调用父类的doConvert转换为spring-messaging的Message,之后再通过RocketMQUtil.convertToClientMessage转换为org.apache.rocketmq.client.apis.message.Message,最后通过grpcProducer.send(rocketMsg)进行发送

syncSendNormalMessage

代码语言:javascript复制
    public SendReceipt syncSendNormalMessage(String destination, Object payload) {
        Message<?> message = MessageBuilder.withPayload(payload).build();
        return syncSendGrpcMessage(destination, message, null, null);
    }

    public SendReceipt syncSendNormalMessage(String destination, String payload) {
        Message<?> message = MessageBuilder.withPayload(payload).build();
        return syncSendGrpcMessage(destination, message, null, null);
    }

    public SendReceipt syncSendNormalMessage(String destination, Message<?> message) {
        return syncSendGrpcMessage(destination, message, null, null);
    }

    public SendReceipt syncSendNormalMessage(String destination, byte[] payload) {
        Message<?> message = MessageBuilder.withPayload(payload).build();
        return syncSendGrpcMessage(destination, message, null, null);
    }

syncSendNormalMessage主要是通过MessageBuilder构建Message,然后通过syncSendGrpcMessage进行发送

syncSendFifoMessage

代码语言:javascript复制
    public SendReceipt syncSendFifoMessage(String destination, Object payload, String messageGroup) {
        Message<?> message = MessageBuilder.withPayload(payload).build();
        return syncSendGrpcMessage(destination, message, null, messageGroup);
    }

    public SendReceipt syncSendFifoMessage(String destination, String payload, String messageGroup) {
        Message<?> message = MessageBuilder.withPayload(payload).build();
        return syncSendGrpcMessage(destination, message, null, messageGroup);
    }

    public SendReceipt syncSendFifoMessage(String destination, byte[] payload, String messageGroup) {
        Message<?> message = MessageBuilder.withPayload(payload).build();
        return syncSendGrpcMessage(destination, message, null, messageGroup);
    }

    public SendReceipt syncSendFifoMessage(String destination, Message<?> message, String messageGroup) {
        return syncSendGrpcMessage(destination, message, null, messageGroup);
    }

syncSendFifoMessage也是通过MessageBuilder构建Message,只是通过syncSendGrpcMessage进行发送时指定了messageGroup

syncSendDelayMessage

代码语言:javascript复制
    public SendReceipt syncSendDelayMessage(String destination, Object payload, Duration messageDelayTime) {
        Message<?> message = MessageBuilder.withPayload(payload).build();
        return syncSendGrpcMessage(destination, message, messageDelayTime, null);
    }

    public SendReceipt syncSendDelayMessage(String destination, String payload, Duration messageDelayTime) {
        Message<?> message = MessageBuilder.withPayload(payload).build();
        return syncSendGrpcMessage(destination, message, messageDelayTime, null);
    }

    public SendReceipt syncSendDelayMessage(String destination, byte[] payload, Duration messageDelayTime) {
        Message<?> message = MessageBuilder.withPayload(payload).build();
        return syncSendGrpcMessage(destination, message, messageDelayTime, null);
    }

    public SendReceipt syncSendDelayMessage(String destination, Message<?> message, Duration messageDelayTime) {
        return syncSendGrpcMessage(destination, message, messageDelayTime, null);
    }

syncSendDelayMessage也是通过MessageBuilder构建Message,只是通过syncSendGrpcMessage进行发送时指定了messageDelayTime

asyncSend

代码语言:javascript复制
    public CompletableFuture<SendReceipt> asyncSend(String destination, Message<?> message, Duration messageDelayTime, String messageGroup, CompletableFuture<SendReceipt> future) {
        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
            log.error("send request message failed. destination:{}, message is null ", destination);
            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
        }
        Producer grpcProducer = this.getProducer();
        try {
            org.apache.rocketmq.client.apis.message.Message rocketMsg = this.createRocketMQMessage(destination, message, messageDelayTime, messageGroup);
            future = grpcProducer.sendAsync(rocketMsg);
        } catch (Exception e) {
            log.error("send request message failed. destination:{}, message:{} ", destination, message);
            throw new MessagingException(e.getMessage(), e);
        }
        return future;
    }

    public CompletableFuture<SendReceipt> asyncSendWithObjectPayload(String destination, Object payload, Duration messageDelayTime, String messageGroup, CompletableFuture<SendReceipt> future) {
        Message<?> message = MessageBuilder.withPayload(payload).build();
        return asyncSend(destination, message, messageDelayTime, messageGroup, future);
    }

    public CompletableFuture<SendReceipt> asyncSendWithStringPayload(String destination, String payload, Duration messageDelayTime, String messageGroup, CompletableFuture<SendReceipt> future) {
        Message<?> message = MessageBuilder.withPayload(payload).build();
        return asyncSend(destination, message, messageDelayTime, messageGroup, future);
    }

    public CompletableFuture<SendReceipt> asyncSendWithBytePayload(String destination, byte[] payload, Duration messageDelayTime, String messageGroup, CompletableFuture<SendReceipt> future) {
        Message<?> message = MessageBuilder.withPayload(payload).build();
        return asyncSend(destination, message, messageDelayTime, messageGroup, future);
    }

    public CompletableFuture<SendReceipt> asyncSendWithMessagePayload(String destination, Message<?> payload, Duration messageDelayTime, String messageGroup, CompletableFuture<SendReceipt> future) {
        return asyncSend(destination, payload, messageDelayTime, messageGroup, future);
    }        

asyncSend与syncSendGrpcMessage类似先通过createRocketMQMessage转换为org.apache.rocketmq.client.apis.message.Message,只是最后调用的是grpcProducer.sendAsync来发送;asyncSendWithObjectPayload、asyncSendWithStringPayload、asyncSendWithBytePayload、asyncSendWithMessagePayload最后调用的是asyncSend方法

asyncSendNormalMessage

代码语言:javascript复制
    public CompletableFuture<SendReceipt> asyncSendNormalMessage(String destination, Object payload, CompletableFuture<SendReceipt> future) {
        return asyncSendWithObjectPayload(destination, payload, null, null, future);
    }

    public CompletableFuture<SendReceipt> asyncSendNormalMessage(String destination, String payload, CompletableFuture<SendReceipt> future) {
        return asyncSendWithStringPayload(destination, payload, null, null, future);
    }

    public CompletableFuture<SendReceipt> asyncSendNormalMessage(String destination, byte[] payload, CompletableFuture<SendReceipt> future) {
        return asyncSendWithBytePayload(destination, payload, null, null, future);
    }

    public CompletableFuture<SendReceipt> asyncSendNormalMessage(String destination, Message<?> payload, CompletableFuture<SendReceipt> future) {
        return asyncSendWithMessagePayload(destination, payload, null, null, future);
    }

asyncSendNormalMessage委托给了asyncSendWithObjectPayload、asyncSendWithStringPayload、asyncSendWithBytePayload、asyncSendWithMessagePayload;最后调用的是asyncSend方法

asyncSendFifoMessage

代码语言:javascript复制
    public CompletableFuture<SendReceipt> asyncSendFifoMessage(String destination, Object payload, String messageGroup, CompletableFuture<SendReceipt> future) {
        return asyncSendWithObjectPayload(destination, payload, null, messageGroup, future);
    }

    public CompletableFuture<SendReceipt> asyncSendFifoMessage(String destination, String payload, String messageGroup, CompletableFuture<SendReceipt> future) {
        return asyncSendWithStringPayload(destination, payload, null, messageGroup, future);
    }

    public CompletableFuture<SendReceipt> asyncSendFifoMessage(String destination, byte[] payload, String messageGroup, CompletableFuture<SendReceipt> future) {
        return asyncSendWithBytePayload(destination, payload, null, messageGroup, future);
    }

    public CompletableFuture<SendReceipt> asyncSendFifoMessage(String destination, Message<?> payload, String messageGroup, CompletableFuture<SendReceipt> future) {
        return asyncSendWithMessagePayload(destination, payload, null, messageGroup, future);
    }

asyncSendFifoMessage委托给了asyncSendWithObjectPayload、asyncSendWithStringPayload、asyncSendWithBytePayload、asyncSendWithMessagePayload,只是传递了messageGroup

asyncSendDelayMessage

代码语言:javascript复制
    public CompletableFuture<SendReceipt> asyncSendDelayMessage(String destination, Object payload, Duration messageDelayTime, CompletableFuture<SendReceipt> future) {
        return asyncSendWithObjectPayload(destination, payload, messageDelayTime, null, future);
    }

    public CompletableFuture<SendReceipt> asyncSendDelayMessage(String destination, String payload, Duration messageDelayTime, CompletableFuture<SendReceipt> future) {
        return asyncSendWithStringPayload(destination, payload, messageDelayTime, null, future);
    }

    public CompletableFuture<SendReceipt> asyncSendDelayMessage(String destination, byte[] payload, Duration messageDelayTime, CompletableFuture<SendReceipt> future) {
        return asyncSendWithBytePayload(destination, payload, messageDelayTime, null, future);
    }

    public CompletableFuture<SendReceipt> asyncSendDelayMessage(String destination, Message<?> payload, Duration messageDelayTime, CompletableFuture<SendReceipt> future) {
        return asyncSendWithMessagePayload(destination, payload, messageDelayTime, null, future);
    }

asyncSendDelayMessage委托了asyncSendWithObjectPayload、asyncSendWithStringPayload、asyncSendWithBytePayload、asyncSendWithMessagePayload,只是传递了messageDelayTime

sendMessageInTransaction

代码语言:javascript复制
    public Pair<SendReceipt, Transaction> sendMessageInTransaction(String destination, Object payload) throws ClientException {
        Message<?> message = MessageBuilder.withPayload(payload).build();
        return sendTransactionMessage(destination, message);
    }

    public Pair<SendReceipt, Transaction> sendMessageInTransaction(String destination, String payload) throws ClientException {
        Message<?> message = MessageBuilder.withPayload(payload).build();
        return sendTransactionMessage(destination, message);
    }

    public Pair<SendReceipt, Transaction> sendMessageInTransaction(String destination, byte[] payload) throws ClientException {
        Message<?> message = MessageBuilder.withPayload(payload).build();
        return sendTransactionMessage(destination, message);
    }

    /**
     * @param destination formats: `topicName:tags`
     * @param message     {@link Message} the message to be sent.
     * @return CompletableFuture<SendReceipt> Asynchronous Task Results
     */
    public Pair<SendReceipt, Transaction> sendTransactionMessage(String destination, Message<?> message) {
        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
            log.error("send request message failed. destination:{}, message is null ", destination);
            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
        }
        final SendReceipt sendReceipt;
        Producer grpcProducer = this.getProducer();
        org.apache.rocketmq.client.apis.message.Message rocketMsg = this.createRocketMQMessage(destination, message, null, null);
        final Transaction transaction;
        try {
            transaction = grpcProducer.beginTransaction();
            sendReceipt = grpcProducer.send(rocketMsg, transaction);
            log.info("Send transaction message successfully, messageId={}", sendReceipt.getMessageId());
        } catch (ClientException e) {
            log.error("send request message failed. destination:{}, message:{} ", destination, message);
            throw new RuntimeException(e);
        }
        return new Pair<>(sendReceipt, transaction);
    }    

sendMessageInTransaction先通过MessageBuilder构建消息,最后调用的是sendTransactionMessage;sendTransactionMessage也是先通过createRocketMQMessage转为org.apache.rocketmq.client.apis.message.Message,最后通过grpcProducer.beginTransaction()、grpcProducer.send(rocketMsg, transaction)来发送事务消息

receive

代码语言:javascript复制
    public List<MessageView> receive(int maxMessageNum, Duration invisibleDuration) throws ClientException {
        SimpleConsumer simpleConsumer = this.getSimpleConsumer();
        return simpleConsumer.receive(maxMessageNum, invisibleDuration);
    }


    public CompletableFuture<List<MessageView>> receiveAsync(int maxMessageNum, Duration invisibleDuration) throws ClientException, IOException {
        SimpleConsumer simpleConsumer = this.getSimpleConsumer();
        CompletableFuture<List<MessageView>> listCompletableFuture = simpleConsumer.receiveAsync(maxMessageNum, invisibleDuration);
        simpleConsumer.close();
        return listCompletableFuture;
    }

receive通过simpleConsumer.receive来拉取;receiveAsync则是通过simpleConsumer.receiveAsync来拉取

ack

代码语言:javascript复制
    public void ack(MessageView message) throws ClientException {
        SimpleConsumer simpleConsumer = this.getSimpleConsumer();
        simpleConsumer.ack(message);
    }


    public CompletableFuture<Void> ackAsync(MessageView messageView) {
        SimpleConsumer simpleConsumer = this.getSimpleConsumer();
        return simpleConsumer.ackAsync(messageView);
    }

ack是通过simpleConsumer.ack来操作;ackAsync是通过simpleConsumer.ackAsync来操作

小结

rocketmq5的RocketMQClientTemplate继承了org.springframework.messaging.core.AbstractMessageSendingTemplate,实现了spring-messaging的MessageSendingOperations接口,同时它自己扩展了如下方法

  • syncSendNormalMessage、syncSendFifoMessage、syncSendDelayMessage
  • asyncSendNormalMessage、asyncSendFifoMessage、asyncSendDelayMessage
  • sendMessageInTransaction
  • receive、receiveAsync
  • ack、ackAsync 其发送消息底层依赖的是grpcProducer.send、sendAsync、send(rocketMsg, transaction),而消费相关的则是依赖的SimpleConsumer

0 人点赞