生产者
代码语言:javascript复制/**
* 消息被拒的情况
*/
public class Produce0001 {
private static final String NORMAL_EXCHANGE="normal_exchange";
public static void main(String[] args) throws Exception{
Channel channel = untils.getChannel();
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
//该消息用作队列的个数限制
for(int i=0;i<10;i )
{
String message="info" i;
channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null,message.getBytes(StandardCharsets.UTF_8));
System.out.println("生产者发送消息" message);
}
}
}
消费者:
代码语言:javascript复制/**
* 消息被拒的情况
*/
public class Consumer0001 {
//普通交换机
private static final String NORMAL_EXCHANGE="normal_exchange";
//死信交换机
private static final String DEAD_EXCHANGE="dead_exchange";
public static void main(String[] args) throws Exception{
Channel channel = untils.getChannel();
//声明死信交换机,类型为direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
//声明死信队列
String deadQueue="dead_queue";
channel.queueDeclare(deadQueue,false,false,false,null);
//死信队列绑定交换和routingKey值
channel.queueBind(deadQueue,DEAD_EXCHANGE,"lisi");
//正常队列绑定死信队列
Map<String,Object> params=new HashMap<>();
//正常队列设置死信交换机,参数key是固定值
params.put("x-dead-letter-exchange",DEAD_EXCHANGE);
//正常队列设置死信routing-key,参数key是固定值
params.put("x-dead-letter-routing-key", "lisi");
//正常队列设置的最大限制长度
params.put("x-max-length",6);
System.out.println("等待接收消息....");
String normalQueue="normal_queue";
channel.queueDeclare(normalQueue,false,false,false,params);
channel.queueBind(normalQueue,NORMAL_EXCHANGE,"zhangsan");
DeliverCallback deliverCallback=(consumerTag, message) -> {
String s = new String(message.getBody(), StandardCharsets.UTF_8);
if (s.equals("info5"))
{
System.out.println("info5拒接");
channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
}
else
{
System.out.println("01接收到消息" s);
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
}
};
channel.basicConsume(normalQueue,false,deliverCallback,consumerTag -> {});
}
}
结果: