消息中间件
消息中间件
什么是消息中间件
概述
消息中间件可以理解成就是一个服务软件,保存信息的容器,比如生活中的快递云柜. 我们把数据放到消息中间件当中, 然后通知对应的服务进行获取 消息中间件是在消息的传输过程中保存信息的容器
消息中间件应用场景
- 使用消息服务器当做大的队列使用, 先进先出, 来处理高并发写入操作
- 使用消息服务器可以将业务系统的串行执行改为并行执行, 处理效率高, 更合理的榨取服务器的性能.
同步与异步技术
同步技术
dubbo是一中同步技术, 实时性高, controller调用service项目, 调用就执行, 如果service项目中的代码没有执行完, controller里面的代码一致等待结果.
异步技术
mq消息中间件技术(jms) 是一种异步技术, 消息发送方, 将消息发送给消息服务器, 消息服务器未必立即处理.什么时候去处理, 主要看消息服务器是否繁忙, 消息进入服务器后会进入队列中, 先进先出.实时性不高.
JMS
概述:
jms的全称叫做Java message service (Java消息服务) jms是jdk底层定义的规范 各大厂商都是实现这个规范的技术
jms消息服务器同类型技术
ActiveMQ:是apache的一个比较老牌的消息中间件, 它比较均衡, 既不是最安全的, 也不是最快的. RabbitMQ:是阿里巴巴的一个消息中间件, 更适合金融类业务, 它对数据的安全性比较高.能够保证数据不丢失. Kafka:Apache下的一个子项目。特点:高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;适合处理海量数据。
JMS中支持的消息类型:
TextMessage: 一个字符串对象 MapMessage:key-value ObjectMessage:一个序列化的 Java 对象 BytesMessage:一个字节的数据流 StreamMessage:Java 原始值的数据流
JMS中的两种发送模式
点对点模式
一个发送方, 一个接收方. 也可以多个发送方, 一个接收方, 主要是接收方必须是第一个.
订阅发布模式
一个发送方, 多个接收方. 发送方也可以是多个, 主要看接收方, 接收方必须是多个
ActiveMQ安装
链接: https://pan.baidu.com/s/1B0ZW3_Z3xcamUCniNjd10Q 提取码: avr2
- 将apache-activemq-5.12.0-bin.tar.gz 上传至Linux服务器
- 解压此文件 tar zxvf apache-activemq-5.12.0-bin.tar.gz
- 为apache-activemq-5.12.0目录赋权 chmod 777 apache-activemq-5.12.0
- 进入apache-activemq-5.12.0bin目录赋与执行权限 cd /usr/local/apache-activemq-5.12.0/bin chmod 755 activemq
- 启动 ./activemq start
- 在浏览器当中输入http://192.168.0.106:8161/ ( ip:8161)
- 进入管理页面
- 用户名和密码都是 admin
说明
- Number Of Pending Messages :等待消费的消息 这个是当前未出队列的数量。
- Number Of Consumers :消费者 这个是消费者端的消费者数量
- Messages Enqueued :进入队列的消息 进入队列的总数量,包括出队列的。
- Messages Dequeued :出了队列的消息 可以理解为是消费这消费掉的数量。
消息服务器小例子
- 创建普通maven Jar工程
- 引入pom依赖
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>5.13.4</version>
</dependency>
</dependencies>
点对点模式Queue
创建QueueProducer
代码语言:javascript复制public class QueueProducer {
public static void main(String[] args) throws Exception{
//1.创建连接工厂
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.0.106:61616");
//2.获取连接
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.获取session (参数1:是否启动事务,参数2:消息确认模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建队列对象, 指定发送的队列名称, 队列名称可以随意起名, 但是发送到哪里, 就要从哪里去接收
Queue queue = session.createQueue("test-queue");
//6.创建消息生产者
MessageProducer producer = session.createProducer(queue);
//7.创建消息
TextMessage textMessage = session.createTextMessage("Hello ActiveMQ");
//8.发送消息
producer.send(textMessage);
//9.关闭资源
producer.close();
session.close();
connection.close();
}
}
创建QueueConsumer
代码语言:javascript复制public class QueueConsumer {
public static void main(String[] args) throws Exception{
//1.创建连接工厂
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.0.106:61616");
//2.获取连接
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.获取session (参数1:是否启动事务,参数2:消息确认模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建队列对象
Queue queue = session.createQueue("test-queue");
//6.创建消息消费
MessageConsumer consumer = session.createConsumer(queue);
//7.监听消息
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage=(TextMessage)message;
try {
System.out.println("接收到消息:" textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//8.等待键盘输入
System.in.read();
//9.关闭资源
consumer.close();
session.close();
connection.close();
}
}
运行QueueProducer后运行QueueConsumer
订阅发布模式Topic
TopicConsumer1
代码语言:javascript复制public class TopicConsumer1 {
public static void main(String[] args) throws Exception {
//1.创建连接工厂
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.0.106:61616");
//2.获取连接
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.获取session (参数1:是否启动事务,参数2:消息确认模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建主题对象
Topic topic = session.createTopic("test-topic");
//6.创建消息消费
MessageConsumer consumer = session.createConsumer(topic);
//7.监听消息
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage=(TextMessage)message;
try {
System.out.println("接收到消息:" textMessage.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
//8.等待键盘输入
System.in.read();
//9.关闭资源
consumer.close();
session.close();
connection.close();
}
}
TopicConsumer2
代码语言:javascript复制public class TopicConsumer2 {
public static void main(String[] args) throws Exception {
//1.创建连接工厂
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.0.106:61616");
//2.获取连接
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.获取session (参数1:是否启动事务,参数2:消息确认模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建主题对象
Topic topic = session.createTopic("test-topic");
//6.创建消息消费
MessageConsumer consumer = session.createConsumer(topic);
//7.监听消息
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage=(TextMessage)message;
try {
System.out.println("接收到消息:" textMessage.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
//8.等待键盘输入
System.in.read();
//9.关闭资源
consumer.close();
session.close();
connection.close();
}
}
TopicProducer
代码语言:javascript复制public class TopicProducer {
public static void main(String[] args) throws Exception {
//1.创建连接工厂
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.0.106:61616");
//2.获取连接
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.获取session (参数1:是否启动事务,参数2:消息确认模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建主题对象
Topic topic = session.createTopic("test-topic");
//6.创建消息生产者
MessageProducer producer = session.createProducer(topic);
//7.创建消息
TextMessage textMessage = session.createTextMessage("Hello Topic ActiveMQ");
//8.发送消息
producer.send(textMessage);
//9.关闭资源
producer.close();
session.close();
connection.close();
}
}