死信队列监听一开始的逻辑是正确的,但关于监听的内容以及动态判断有了新的思路,不断发现不断改善。
监听新思路
1.不必去破坏生产者消费者的关系,去创建死信队列的对应消费者,如果不同队列去创建对应的死信队列监听,没什么意义,复用刚开始的思路进行更改。mq配置更改指定监听死信队列,死信队列的名称是可以配置指定的
代码语言:javascript复制@Bean
public ActiveMQConnectionFactory connectionFactory() throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerurl);
factory.setTrustAllPackages(true);//信任所有包下的序列化对象,解决无法发送对象消息
javax.jms.Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("ActiveMQ.DLQ");
MessageConsumer messageConsumer = session.createConsumer(queue);
messageConsumer.setMessageListener(new QueueListener());
factory.setUserName(userName);
factory.setPassword(password);
return factory;
}
新建监听类
代码语言:javascript复制/**
* @author zhaokkstart
* @create 2020-09-11 11:18
*/
public class QueueListener implements MessageListener {
@Override
public void onMessage(Message message) {
System.out.println("****消费者的消息:" message);
}
}
看下能从message获取到什么东西
代码语言:javascript复制public interface Message {
static final int DEFAULT_DELIVERY_MODE = DeliveryMode.PERSISTENT;
static final int DEFAULT_PRIORITY = 4;
static final long DEFAULT_TIME_TO_LIVE = 0;
String getJMSMessageID() throws JMSException;
void setJMSMessageID(String id) throws JMSException;
long getJMSTimestamp() throws JMSException;
void setJMSTimestamp(long timestamp) throws JMSException;
byte[] getJMSCorrelationIDAsBytes() throws JMSException;
void setJMSCorrelationIDAsBytes(byte[] correlationID) throws JMSException;
void setJMSCorrelationID(String correlationID) throws JMSException;
String getJMSCorrelationID() throws JMSException;
Destination getJMSReplyTo() throws JMSException;
void setJMSReplyTo(Destination replyTo) throws JMSException;
Destination getJMSDestination() throws JMSException;
void setJMSDestination(Destination destination) throws JMSException;
int getJMSDeliveryMode() throws JMSException;
void setJMSDeliveryMode(int deliveryMode) throws JMSException;
boolean getJMSRedelivered() throws JMSException;
void setJMSRedelivered(boolean redelivered) throws JMSException;
String getJMSType() throws JMSException;
void setJMSType(String type) throws JMSException;
long getJMSExpiration() throws JMSException;
void setJMSExpiration(long expiration) throws JMSException;
int getJMSPriority() throws JMSException;
void setJMSPriority(int priority) throws JMSException;
void clearProperties() throws JMSException;
boolean propertyExists(String name) throws JMSException;
boolean getBooleanProperty(String name) throws JMSException;
byte getByteProperty(String name) throws JMSException;
short getShortProperty(String name) throws JMSException;
int getIntProperty(String name) throws JMSException;
long getLongProperty(String name) throws JMSException;
float getFloatProperty(String name) throws JMSException;
double getDoubleProperty(String name) throws JMSException;
String getStringProperty(String name) throws JMSException;
Object getObjectProperty(String name) throws JMSException;
Enumeration getPropertyNames() throws JMSException;
void setBooleanProperty(String name, boolean value) throws JMSException;
void setByteProperty(String name, byte value) throws JMSException;
void setShortProperty(String name, short value) throws JMSException;
void setIntProperty(String name, int value) throws JMSException;
void setLongProperty(String name, long value) throws JMSException;
void setFloatProperty(String name, float value) throws JMSException;
void setDoubleProperty(String name, double value) throws JMSException;
void setStringProperty(String name, String value) throws JMSException;
void setObjectProperty(String name, Object value) throws JMSException;
void acknowledge() throws JMSException;
void clearBody() throws JMSException;}
还是对象里的那点东西,到有个新思路
代码语言:javascript复制commandId = 5,
responseRequired = true,
messageId = ID: kk - 59648 - 1599635155556 - 1: 239: 1: 1: 1,
originalDestination = null, originalTransactionId = null,
producerId = ID: kk - 59648 - 1599635155556 - 1: 239: 1: 1,
destination = queue: //add_xxxxxx,
transactionId = null, expiration = 0, timestamp = 1599636301936, arrival = 0,
brokerInTime = 1599636301937, brokerOutTime = 1599636302110,
correlationId = null, replyTo = null, persistent = true, type = null,
priority = 4, groupID = null, groupSequence = 0,
targetConsumerId = null, compressed = false, userID = null,
content = org.apache.activemq.util.ByteSequence@54eae153,
marshalledProperties = org.apache.activemq.util.ByteSequence@1318dd4d,
dataStructure = null, redeliveryCounter = 0, size = 0, properties = {timestamp=1599636300958},
readOnlyProperties = true, readOnlyBody = true, droppable = false,
jmsXGroupFirstForConsumer = false}
不必去指定type维护队列的信息关系,获取
destination = queue: //add_xxxxxx,
即获取originalDestination属性,判断此消息的入队队列是哪个,然后获取该队列的消费者入队消息进行转换
这个方法是肯定能监听到死信队列的,亲测有效。
(1) 死信队列的配置(一般采用默认)
1. sharedDeadLetterStrategy
不管是queue还是topic,失败的消息都放到这个队列中。下面修改activemq.xml的配置,可以达到修改队列的名字。
2. individualDeadLetterStrategy
可以为queue和topic单独指定两个死信队列。还可以为某个话题,单独指定一个死信队列。
代码语言:javascript复制activemq的API文档 http://activemq.apache.org/maven/apidocs/index.html