一、事务transaction
1、事务偏生产者/签收偏消费者
2、生产者提交事务只有两个状态true/false
false:只需要执行send方法,消息就会进入队列中
关闭事务,那第二个参数签收的设置需要有效
true:先执行send方法,在执行commit,消息才被真正的提交到队列中
消息需要批量发送,需要缓冲区处理
未开启事务生产者
代码语言:javascript复制import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @ProjectName: springbootActiveMQ
* @Package: cn.**.test
* @Author: huat
* @Date: 2020/1/2 17:04
* @Version: 1.0
*/
public class ActiveMQTest {
//url路径
private static final String ACTRIVE_URL="tcp://192.168.44.135:61616";
//队列名称
private static final String QUEUE_NAME="queue01";
public static void main(String[] args) {
//1、创建连接工厂
//如果账号密码没有修改的话,账号密码默认均为admin
ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL);
//如果账号密码修改的话
//第一个参数为账号,第二个为密码,第三个为请求的url
//ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL);
try {
//2、通过连接工厂获取连接
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3、创建session会话
//里面会有两个参数,第一个为事务,第二个是签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4、创建目的地(具体是队列还是主题),这里是创建队列
Queue queue=session.createQueue(QUEUE_NAME);
//5、创建消息生产者,队列模式
MessageProducer messageProducer = session.createProducer(queue);
//6、通过messageProducer生产三条消息发送到MQ消息队列中
for (int i=0;i<3;i ){
//7、创建消息
TextMessage textMessage = session.createTextMessage("msg----->" i);//创建一个文本消息
//消息属性
textMessage.setStringProperty("c01","vip");
//8、通过messageProducer发送给mq
messageProducer.send(textMessage);
//9、数据非持久化
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
}
messageProducer.close();
session.close();
connection.close();
System.out.println("消息发送成功");
} catch (JMSException e) {
e.printStackTrace();
}
}
}
已开启事务生产者
代码语言:javascript复制import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @ProjectName: springbootActiveMQ
* @Package: cn.**.test
* @Author: huat
* @Date: 2020/1/2 17:04
* @Version: 1.0
*/
public class ActiveMQTest {
//url路径
private static final String ACTRIVE_URL="tcp://192.168.44.135:61616";
//队列名称
private static final String QUEUE_NAME="queue01";
public static void main(String[] args) {
//1、创建连接工厂
//如果账号密码没有修改的话,账号密码默认均为admin
ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL);
//如果账号密码修改的话
//第一个参数为账号,第二个为密码,第三个为请求的url
//ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL);
try {
//2、通过连接工厂获取连接
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3、创建session会话
//里面会有两个参数,第一个为事务,第二个是签收
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
//4、创建目的地(具体是队列还是主题),这里是创建队列
Queue queue=session.createQueue(QUEUE_NAME);
//5、创建消息生产者,队列模式
MessageProducer messageProducer = session.createProducer(queue);
//6、通过messageProducer生产三条消息发送到MQ消息队列中
for (int i=0;i<3;i ){
//7、创建消息
TextMessage textMessage = session.createTextMessage("msg----->" i);//创建一个文本消息
//消息属性
textMessage.setStringProperty("c01","vip");
//8、通过messageProducer发送给mq
messageProducer.send(textMessage);
//9、数据非持久化
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
}
messageProducer.close();
session.commit();
session.close();
connection.close();
System.out.println("消息发送成功");
} catch (JMSException e) {
e.printStackTrace();
}
}
}
二、签收
一、非事务签收
1、自动签收(默认)
2、手动签收
Session.CLIENT_ACKNOWLEDGE,需要客户端调用acknowledge(message.acknowledge();)方法进行手动签收
3、允许重复消息
Session.DUPS_OK_ACKNOWLEDGE
4、事务级
Session.SESSION_TRANSACTED
二、签收
1、非事务手动签收
需要客户端调用acknowledge(message.acknowledge();)方法进行手动签收,如果不签收,消息重复消费
2、开启事务,设置手动签收
有commit语句的情况下,不需要调用message.acknowledge();方法进行签收,开启事务会认为你自动签收
没有commit语句的情况下,即使调用message.acknowledge();方法进行签收,也会出现重复消费
三、总结
1、事务大于签收,所以一定要在开启事务的情况下进行签收,在事务性会话当中,当一个事务被成功提交则消息被自动签收。如果事务回滚,则消息会被再次传送。
2、非事务性会话当中,消息何时被签收取决于创建会话时的应答模式(message.acknowledge();)
四、MQ消息可靠性:
1、事物
2、签收
3、消息持久性
4、集群