基于ActiveMQ的请求-应答模式

2020-09-03 15:31:40 浏览数 (2)

基于ActiveMQ的请求-应答模式

一. 使用场景

基于ActiveMQ的请求-应答模式,相当于通过消息队列,请求端注册了一个异步回调,在发送消息时指定回调消息的目的地和关联的id,这样应答端在收到请求消息时,可以在处理后,将处理结果的应答消息发送到回调的目的地中。

二. 代码实例

首先是请求和响应的消息定义:

代码语言:javascript复制
/**
 * @Auther: ZhangShenao
 * @Date: 2019/2/12 18:40
 * @Description:请求消息
 */
@Getter
@Setter
@ToString
public class RequestDto {
    //业务id
    private String uid;

    //业务数据
    private String payload;
}
代码语言:javascript复制
/**
 * @Auther: ZhangShenao
 * @Date: 2019/2/12 18:33
 * @Description:响应消息
 */
@Getter
@Setter
@ToString
public class ResponseDto {
    //业务id
    private String uid;

    //处理结果
    private boolean success;
}

请求端:

代码语言:javascript复制
/**
 * @Auther: ZhangShenao
 * @Date: 2019/2/12 18:09
 * @Description:Request-Response模式的请求端
 */
@Service
public class Producer {
    //保存所有请求的业务id和响应结果
    private static final Map<String, Boolean> replies = new ConcurrentHashMap<>();

    @Autowired
    @Qualifier(Constants.JMS_QUEUE_TEMPLATE)
    private JmsTemplate queueTemplat;

    public void sendMessage(RequestDto dto) {
        queueTemplat.send(Constants.QueueNames.REQUEST_QUEUE, session -> {
            TextMessage message = session.createTextMessage(dto.getPayload());

            //设置消息关联id,将请求和应答消息关联起来
            message.setJMSCorrelationID(dto.getUid());

            //设置消息回复的目的地
            message.setJMSReplyTo(new ActiveMQQueue(Constants.QueueNames.RESPONSE_QUEUE));

            //记录发送的请求
            replies.putIfAbsent(dto.getUid(), false);
            return message;
        });
    }

    @JmsListener(destination = Constants.QueueNames.RESPONSE_QUEUE, containerFactory = Constants.QUEUE_LISTENER_CONTAINER_FACTORY)
    public void onReply(ResponseDto dto) {
        replies.put(dto.getUid(), dto.isSuccess());
        System.err.println("On Reply: "   dto);
    }
}

响应端:

代码语言:javascript复制
/**
 * @Auther: ZhangShenao
 * @Date: 2019/2/12 18:42
 * @Description:Request-Response模式的响应端
 */
@Service
public class Consumer {
    @Autowired
    @Qualifier(Constants.JMS_QUEUE_TEMPLATE)
    private JmsTemplate queueTemplat;

    @JmsListener(destination = Constants.QueueNames.REQUEST_QUEUE, containerFactory = Constants.QUEUE_LISTENER_CONTAINER_FACTORY)
    public void onRequest(TextMessage message) throws JMSException {
        //获取消息回复目的地和关联id,向回复目的地发送回复消息
        Destination replyTo = message.getJMSReplyTo();
        ResponseDto response = new ResponseDto();
        response.setUid(message.getJMSCorrelationID());
        response.setSuccess(true);
        queueTemplat.convertAndSend(replyTo,response);
    }
}

0 人点赞