TTL队列/消息
TTL: Time To Live, 生存时间
- RabbitMQ支持消息的过期时间, 在消息发送时可以指定
- RabbitMQ支持队列的过期时间, 从消息进入队列开始计算, 只要超过了队列的超时时间配置, 那么消息会被自动清除
TTL队列代码实现
消费者
代码语言:javascript复制package com.dance.redis.mq.rabbit.ttl;
import com.dance.redis.mq.rabbit.RabbitMQHelper;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class Receiver4TTLExchange {
public static void main(String[] args) throws Exception {
// TTL队列
Channel channel = RabbitMQHelper.getChannel();
// 声明正常的 exchange queue 路由规则
String queueName = "test_ttl_queue";
String exchangeName = "test_ttl_exchange";
String exchangeType = "topic";
String routingKey = "ttl.*";
RabbitMQHelper.exchangeDeclare(channel, exchangeName, RabbitMQHelper.EXCHANGE_TYPE_TOPIC);
Map<String, Object> arguments = new HashMap<>();
// 指定队列的消息过期时间
arguments.put("x-message-ttl", 6000);
// 添加队列扩展参数
RabbitMQHelper.queueDeclare(channel, queueName, true, arguments);
channel.queueBind(queueName, exchangeName, routingKey);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
System.out.println("receive message:" new String(body) ", RoutingKey: " envelope.getRoutingKey());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
channel.basicConsume(queueName, false, consumer);
TimeUnit.SECONDS.sleep(50);
channel.close();
RabbitMQHelper.closeConnection();
}
}
生产者
代码语言:javascript复制package com.dance.redis.mq.rabbit.ttl;
import com.dance.redis.mq.rabbit.RabbitMQHelper;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import java.util.HashMap;
import java.util.Map;
public class Sender4TTLExchange {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQHelper.getChannel();
String exchangeName = "test_ttl_exchange";
String routingKey = "ttl.test";
Map<String, Object> headers = new HashMap<>();
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.headers(headers).build();
String msg = "Hello World RabbitMQ 4 TTL Exchange Message ... ";
channel.basicPublish(exchangeName, routingKey, props, msg.getBytes());
}
}
TTL队列测试
启动消费者
启动生产者
查看消费者
消费成功
此时, 停止消费者, 查看控制台
从队列上的标记features, 也可以看到这是一个TTL队列
队列中是没有消息的,不要启动消费者, 直接启动生产者发送一条消息
查看控制台
可以看到有一条消息, 等待6秒再次查看
消息已经被删除
TTL消息代码实现
消费者
代码语言:javascript复制package com.dance.redis.mq.rabbit.ttl.message;
import com.dance.redis.mq.rabbit.RabbitMQHelper;
import com.rabbitmq.client.*;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class Receiver4TTLExchange {
public static void main(String[] args) throws Exception {
// TTL队列
Channel channel = RabbitMQHelper.getChannel();
// 声明正常的 exchange queue 路由规则
String queueName = "test_ttl_queue_message";
String exchangeName = "test_ttl_exchange_message";
String routingKey = "ttl.message.*";
RabbitMQHelper.exchangeDeclare(channel, exchangeName, RabbitMQHelper.EXCHANGE_TYPE_TOPIC);
RabbitMQHelper.queueDeclare(channel, queueName, true, null);
channel.queueBind(queueName, exchangeName, routingKey);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
System.out.println("receive message:" new String(body) ", RoutingKey: " envelope.getRoutingKey());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
channel.basicConsume(queueName, false, consumer);
TimeUnit.SECONDS.sleep(50);
channel.close();
RabbitMQHelper.closeConnection();
}
}
生产者
代码语言:javascript复制package com.dance.redis.mq.rabbit.ttl.message;
import com.dance.redis.mq.rabbit.RabbitMQHelper;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import java.util.HashMap;
import java.util.Map;
public class Sender4TTLExchange {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQHelper.getChannel();
String exchangeName = "test_ttl_exchange_message";
String routingKey = "ttl.message.test";
Map<String, Object> headers = new HashMap<>();
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
// TTL 消息时间 10秒
.expiration("10000")
.headers(headers).build();
String msg = "Hello World RabbitMQ 4 TTL Exchange Message ... ";
channel.basicPublish(exchangeName, routingKey, props, msg.getBytes());
}
}
TTL消息测试
启动消费者
启动生产者
查看消费者
消费成功, 此时停止消费者, 查看控制台
队列上并没有TTL标记, 所以这不是TTL队列, 这个时候不要启动消费者, 直接启动生产者, 发送一条消息
可以看到有了一条消息, 等待10秒
消息没有了, 消息已被删除