基于ActiveMQ的请求-应答模式
一. 使用场景
基于ActiveMQ的请求-应答模式,相当于通过消息队列,请求端注册了一个异步回调,在发送消息时指定回调消息的目的地和关联的id,这样应答端在收到请求消息时,可以在处理后,将处理结果的应答消息发送到回调的目的地中。
二. 代码实例
首先是请求和响应的消息定义:
代码语言:javascript复制/**
* @Auther: ZhangShenao
* @Date: 2019/2/12 18:40
* @Description:请求消息
*/
@Getter
@Setter
@ToString
public class RequestDto {
//业务id
private String uid;
//业务数据
private String payload;
}
代码语言:javascript复制/**
* @Auther: ZhangShenao
* @Date: 2019/2/12 18:33
* @Description:响应消息
*/
@Getter
@Setter
@ToString
public class ResponseDto {
//业务id
private String uid;
//处理结果
private boolean success;
}
请求端:
代码语言:javascript复制/**
* @Auther: ZhangShenao
* @Date: 2019/2/12 18:09
* @Description:Request-Response模式的请求端
*/
@Service
public class Producer {
//保存所有请求的业务id和响应结果
private static final Map<String, Boolean> replies = new ConcurrentHashMap<>();
@Autowired
@Qualifier(Constants.JMS_QUEUE_TEMPLATE)
private JmsTemplate queueTemplat;
public void sendMessage(RequestDto dto) {
queueTemplat.send(Constants.QueueNames.REQUEST_QUEUE, session -> {
TextMessage message = session.createTextMessage(dto.getPayload());
//设置消息关联id,将请求和应答消息关联起来
message.setJMSCorrelationID(dto.getUid());
//设置消息回复的目的地
message.setJMSReplyTo(new ActiveMQQueue(Constants.QueueNames.RESPONSE_QUEUE));
//记录发送的请求
replies.putIfAbsent(dto.getUid(), false);
return message;
});
}
@JmsListener(destination = Constants.QueueNames.RESPONSE_QUEUE, containerFactory = Constants.QUEUE_LISTENER_CONTAINER_FACTORY)
public void onReply(ResponseDto dto) {
replies.put(dto.getUid(), dto.isSuccess());
System.err.println("On Reply: " dto);
}
}
响应端:
代码语言:javascript复制/**
* @Auther: ZhangShenao
* @Date: 2019/2/12 18:42
* @Description:Request-Response模式的响应端
*/
@Service
public class Consumer {
@Autowired
@Qualifier(Constants.JMS_QUEUE_TEMPLATE)
private JmsTemplate queueTemplat;
@JmsListener(destination = Constants.QueueNames.REQUEST_QUEUE, containerFactory = Constants.QUEUE_LISTENER_CONTAINER_FACTORY)
public void onRequest(TextMessage message) throws JMSException {
//获取消息回复目的地和关联id,向回复目的地发送回复消息
Destination replyTo = message.getJMSReplyTo();
ResponseDto response = new ResponseDto();
response.setUid(message.getJMSCorrelationID());
response.setSuccess(true);
queueTemplat.convertAndSend(replyTo,response);
}
}