文章目录
- 分布式事务–消息队列
- 1.思考
- 2.分布式事务
- 3.XA协议
- 4.TCC
- 5.消息队列
- 6.疑问
- 消息队列
- 1.消息产品
- 2.ActiveMQ
- 2.1 整合activemq
- 2.2 队列消息
- 2.2.1 生产者
- 2.2.2 消费者
- 2.3 消息事务
- 2.4 消息持久化
分布式事务–消息队列
1.思考
sso服务
用户服务
日志服务
购物服务(购物车合并)
短信服务
订单服务
库存服务
物流服务
如何让这么多的服务并行执行?【涉及到分布式事务:为了保证数据的一致性】
2.分布式事务
分布式事务:在分布式环境下,如何保证数据一致性
分布式事务会涉及到性能太低的一个通病。
方案:
- xa协议下的两段式提交
- xa两段式提交的进阶版:tcc
- 基于消息,采用最终一致性策略的分布式事务
LNC 分布式框架. 分布式事务理论基础:CPA理论、BASE理论
3.XA协议
XA协议:数据库与事务管理器的一个标准。
在xa协议下,提交一个事务需要经过两个阶段
阶段一:预备提交
阶段二:提交
4.TCC
需要在业务层实现,try,confirm,和cancle的接口。
5.消息队列
在一个事务正在进行的同时,发出消息给其他的业务,如果消息发送失败,或者消息的执行失败,则回滚消息,重复执行,反复执行失败后,记录失败信息,后期补充性的处理;在消息系统中开启事务,消息的事务是指,保证消息被正常消费,否则回滚的一种机制
补偿机制:日志记录,定时器在某个时间再执行(重试执行)
重复执行,需要考虑幂等性处理逻辑。
6.疑问
- 如何确保消息发送成功? 消息应答模式?
- 消息发送失败如何处理?
- 消息事务?
- 消息幂等性如何处理?
- 消息阻塞?死信队列。
消息队列
1.消息产品
RabbitMQ 、 Kafka、ActiveMQ
- RabbitMQ的协议是AMQP(Advanced Message Queueing Protoco);AMQP通用行较强,非java环境经常使用,传输内容就是标准字符串。RabbitMQ用Erlang开发
- ActiveMQ使用的是JMS(Java Messaging Service )协议,JMS是针对Java体系的传输协议,队列两端必须有JVM,所以如果开发环境都是java的话推荐使用ActiveMQ,可以用Java的一些对象进行传递比如Map、BLob、Stream等。ActiveMQ也支持AMQP协议。
- Kafka性能超过ActiveMQ等传统MQ工具,集群扩展性好;Kafka在传输过程中可能会出现消息重复的情况,不保证发送顺序,没有消息事务功能;一般使用kafka处理大数据日志。
2.ActiveMQ
2.1 整合activemq
只需要加入整合依赖
代码语言:javascript复制<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.15.2</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
2.2 队列消息
Queue 队列模式
Topic 发布订阅模式
Consumer 使用监听器监听MQ上是否有消息。
2.2.1 生产者
代码语言:javascript复制import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTextMessage;
import javax.jms.*;
public class Product {
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.7:61616");
try {
Connection connection = connectionFactory.createConnection();
connection.start();
//第一个值表示是否使用事务,如果选择true,第二个值相当于选择0
Session session = connection.createSession(true, Session.SESSION_TRANSACTED); //Session.SESSION.TRASACTED 开启消息事务
Queue testqueue = session.createQueue("TEST1");
MessageProducer producer = session.createProducer(testqueue);
TextMessage textMessage = new ActiveMQTextMessage();
textMessage.setText("今天天气真好!我想出去走一走");
producer.setDeliveryMode(DeliveryMode.PERSISTENT); //消息持久化
producer.send(textMessage);
session.commit();
connection.close();
}catch (Exception e){
e.printStackTrace();
}
}
}
2.2.2 消费者
代码语言:javascript复制import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Consumer {
public static void main(String[] args) {
ConnectionFactory connect = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://192.168.1.7:61616");
try {
Connection connection = connect.createConnection();
connection.start();
//第一个值表示是否使用事务,如果选择true,第二个值相当于选择0
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination testqueue = session.createQueue("TEST1");
MessageConsumer consumer = session.createConsumer(testqueue);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if(message instanceof TextMessage){
try {
String text = ((TextMessage) message).getText();
System.out.println(text);
// session.rollback();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
session.rollback();
}
}
}
});
}catch (Exception e){
e.printStackTrace();;
}
}
}
2.3 消息事务
producer提交时的事务 | 事务开启 | 只执行send并不会提交到队列中,只有当执行session.commit()时,消息才被真正的提交到队列中。 |
---|---|---|
事务不开启 | 只要执行send,就进入到队列中。 | |
consumer 接收时的事务 | 事务开启,签收必须写Session.SESSION_TRANSACTED | 收到消息后,消息并没有真正的被消费。消息只是被锁住。一旦出现该线程死掉、抛异常,或者程序执行了session.rollback()那么消息会释放,重新回到队列中被别的消费端再次消费。 |
事务不开启,签收方式选择Session.AUTO_ACKNOWLEDGE | 只要调用comsumer.receive方法 ,自动确认。 | |
事务不开启,签收方式选择Session.CLIENT_ACKNOWLEDGE | 需要客户端执行 message.acknowledge(),否则视为未提交状态,线程结束后,其他线程还可以接收到。 这种方式跟事务模式很像,区别是不能手动回滚,而且可以单独确认某个消息。 | |
事务不开启,签收方式选择Session.DUPS_OK_ACKNOWLEDGE | 在Topic模式下做批量签收时用的,可以提高性能。但是某些情况消息可能会被重复提交,使用这种模式的consumer要可以处理重复提交的问题。 |
2.4 消息持久化
通过producer.setDeliveryMode(DeliveryMode.PERSISTENT)
进行设置
持久化的好处就是当activemq
宕机的话,消息队列中的消息不会丢失。非持久化会丢失。但是会消耗一定的性能。
发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/100743.html原文链接:https://javaforall.cn