优先级队列
这块配置要么是消息队列的优先级要么就是具体消息的优先级。
如下:
代码语言:javascript复制 @Bean
public Queue directOneQueue() {
Map map=new HashMap<>();
map.put("x-max-priority",10)
return new Queue("DDD",true,false,false,map);
}
或者:
代码语言:javascript复制 private Message dealMessage(Object message) {
byte[] body = JSON.toJSONBytes(message, SerializeConfig.globalInstance);
//设置消息相关属性
MessageProperties messageProperties = new MessageProperties();
messageProperties.setMessageId(UUID.randomUUID().toString());
messageProperties.setContentType(MediaType.APPLICATION_JSON_VALUE);
messageProperties.setPriority(10);
return new Message(body, messageProperties);
}
这里要说的一点就是优先级队列是通过设置消息的优先级来做的,为什么这样可以实现优先级队列是应该mq中才有了策略让优先级大的消息提前被消费,但是前提是消息的消费的速度要小于消息的生产速度,这个很好理解,想想队列就一个消息那么就谈不上优不优先的问题。
RPC实现
what is rpc?remote procedure call的简称,就是远程调用。是一种通过网络从远程计算机上请求服务,而不需要了解底层网路的技术。
rpc协议有很多,比如java的rmi,webservice,corba等。
使用rabbitmq实现rpc其实也是客户端发送请求,服务端处理之后返回响应消息,为了接受消息,我们需要在请求中发送一个回调队列。服务端处理完毕之后再将结果通过回调队列进行返回。具体代码如下:
代码语言:javascript复制 @Bean
public Queue directOneQueue() {
Map map=new HashMap<>();
map.put("x-max-priority",10);
return new Queue("DDD",true,false,false,map);
}
@Bean
public Queue directTwoQueue() {
Map map=new HashMap<>();
return new Queue("EEE",true,false,false,map);
}
@GetMapping(value = "/test")
public void test(){
Test t=new Test();
t.setName("tianjingle-ceshi");
byte[] body = JSON.toJSONBytes(t, SerializeConfig.globalInstance);
//设置消息相关属性
MessageProperties messageProperties = new MessageProperties();
messageProperties.setMessageId(UUID.randomUUID().toString());
messageProperties.setContentType(MediaType.APPLICATION_JSON_VALUE);
messageProperties.setPriority(10);
messageProperties.setCorrelationId("tianjingle");
messageProperties.setReplyTo("EEE");
Message message1 =new Message(body, messageProperties);
rabbitTemplate.convertAndSend("DDD",message1);
}
代码语言:javascript复制@Slf4j
@Component
public class DelayQueueHandler {
/**
* 消息队列监听
*
* @param message
* @param channel
*/
@RabbitListener(queues = "DDD")
public void directHandlerManualAck(Message message, Channel channel) throws IOException {
// 如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉
final long deliveryTag = message.getMessageProperties().getDeliveryTag();
String replyTo = message.getMessageProperties().getReplyTo();
String uid=message.getMessageProperties().getCorrelationId();
try {
byte[] str=message.getBody();
Object a=JSONArray.parse(str);
Test strs= JSON.parseObject(a.toString(), Test.class);
System.out.println(strs.toString());
System.out.println("消息消费" replyTo);
//设置消息相关属性
} finally {
AMQP.BasicProperties basicProperties=new AMQP.BasicProperties().builder().correlationId(uid).contentType(MediaType.APPLICATION_JSON_VALUE).build();
Test t=new Test();
t.setName("tianjingle-fanui--1111");
byte[] body = JSON.toJSONBytes(t, SerializeConfig.globalInstance);
channel.basicPublish("",replyTo,basicProperties,body);
// 通知 MQ 消息已被成功消费,可以ACK了
channel.basicAck(deliveryTag, false);
}
}
/**
* 消息队列监听
*
* @param message
* @param channel
*/
@RabbitListener(queues = "EEE")
public void directHandlerManualAckEEE(Message message, Channel channel) throws IOException {
// 如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉
final long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
if (message.getMessageProperties().getCorrelationId().equalsIgnoreCase("tianjingle")) {
System.out.println("消息rpc返回来了....");
byte[] str=message.getBody();
Object a=JSONArray.parse(str);
Test strs= JSON.parseObject(a.toString(), Test.class);
System.out.println(strs.toString());
}
} finally {
// 通知 MQ 消息已被成功消费,可以ACK了
channel.basicAck(deliveryTag, false);
}
}
}
需要说明的是在发送消息已经回调的时候都是通过correlationId来进行识别的,其实在分布式条件下具体是哪个服务处理其实都一样,correlationId还是用来处理不同的返回结果的情况,因此需要针对具体的服务设置不同的correlationId来处理特定的消息,除此之外我们也可以用correlationId来进行消息路由。