大家好,又见面了,我是你们的朋友全栈君。
转载了好几篇关于mq的博文,但是总感觉对mq的理解使用都不到位。这里打算从原理到使用都从头来一遍。
1,原理
1.1通过类比理解mq
可以理解它是一个秘书,或是助手,你是老板,你告诉秘书说你要开会,那么秘书就会把开会的时间,地点,人员都安排好。你就省去了这些琐事,这有点类似于sping的面向切面。
当添加一个商品时,商品服务只需要告诉消息中间件MQ,MQ便去通知其它服务做各自该做的事情,比如通知搜索服务去同步索引库,通知redis服务去同步缓存,通知生成静态页面等等。
1.2常见的mq种类
mq的也被叫做中间件,种类有ActiveMQ,RabbitMQ,Kafka等,功能都差不多,这里我们学习ActiveMQ.
1,3什么是ActiveMQ?
它是Apache出品,最流行的,能力最强劲的开源消息总线。它完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。
再者mq也可称为分布式消息队列,因为在mq的订阅式中有多个消费者异步处理多个请求,这就已经达到了分布式处理的目的。
1.4特点
(1)多种语言和协议编写客户端。语言: Java, C, C , C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
(2) 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务) (3) 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性 (4)通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上 (5) 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA (6) 支持通过JDBC和journal提供高速的消息持久化 (7) 从设计上保证了高性能的集群,客户端-服务器,点对点 (8)支持Ajax (9)支持与Axis的整合 (10) 可以很容易得调用内嵌JMS provider,进行测试
ActiveMQ的消息形式
对于消息的传递有两种类型: 一种是点对点的,即一个生产者和一个消费者一一对应; 另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。 JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。我们用的最多的也就是TextMessage而已。 · StreamMessage — Java原始值的数据流 · MapMessage–一套名称-值对 · TextMessage–一个字符串对象 · ObjectMessage–一个序列化的 Java对象 · BytesMessage–一个字节的数据流
我们可以通过下面一张图来加深理解,图上半部分是”发布/订阅者”模式,两个发布者各自发布了一条消息,每条消息都可以被多个Consumer接收到。图下半部分是”面对面”模式,两个发布者各自发布了一条消息,压入队列当中,队列的特点是先进先出,一旦有某个消费者拿走了一条消息,队列中就少了一条消息,剩下的消费者就不可能再消费那条消息了,因此也就做到了一对一。
二 安装ActiveMQ
我这里把mq安装在虚拟机上,当然虚拟机要能上网,还有jdk啥的这里就不说了。下载就到apache官网下载,地址:
http://activemq.apache.org/activemq-5112-release.html
找到如下图位置,点击下载:
下载好后上传,解压,
完了后我们到active目录下看到如下内容:
我们可以看到有一个名为activemq-all-5.12.0.jar的jar包,这个jar包,如果不与spring结合,只是简单用来当做activemq客户端的话,可以使用。如果要将activemq与spring整合的话,不要使用这个jar包,因为这个jar包当中包含了spring的包结构,而且里面的类与spring里面的类名称是一样的,但是方法不全,当我们将spring和activemq结合的时候,如果系统使用的是activemq的jar包当中的spring的类的话就会报错,启动都启动不了,而且错误还隐藏的特别深,难以捉摸其原因。因此整合的话,不要用这个jar包!!!activemq有一个版本5.11.2,里面没有spring的包结构,我们可以使用。
我们看下bin目录下的文件列表,如下图所示,其中activemq文件是用来启动activemq的。
conf目录存放的是一些配置文件,我们不用动,data目录存放的是服务端的缓存数据
webapps提供了管理的后台,如下所示。
3,不用做改动,直接启动mq
xiaoye@ubuntu3:~$ ./activemq/bin/activemq start
INFO: Loading ‘/home/xiaoye/activemq/bin/env’ INFO: Using java ‘/usr/lib/jvm/java-1.8.0-openjdk-amd64/bin/java’ INFO: Starting – inspect logfiles specified in logging.properties and log4j.properties to get details
INFO: pidfile created : ‘/home/xiaoye/activemq/data/activemq.pid’ (pid ‘1699’)
这样就启动成功了;
web访问一下,默认端口是8161
上面这个界面是没有用户登录的界面。下面我们用admin登录,默认的账号和密码都是admin
http://192.168.72.133:8161/admin/
打开这个链接会弹出输入账号密码的框,填进去就行了。
点击Quenes如下,这个是点对点消息发送界面
再点击topic是发布/订阅模式界面
在Send中可以测试发送点对点或发布/订阅两种消息,如下图所示。
三,代码测试ActiveMQ
下面我们要写java代码测试了。
新建一个maven工程。打开eclipse ,右键新建maven project –》
finish
修改pom.xml添加maven依赖,依赖我们到apeche,maven官网去找。可直接百度关键词,active maven,图下:
点开找到,我们下载active版本
把这个maven依赖拷过来:
代码语言:javascript复制<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.11.2</version>
</dependency>
这样maven就能够自动帮我们下载active的jar包了。
下面新建一个junit测试类:
finish即可
在类中加入一下内容做简单测试,报错,我们
如下,鼠标防止@Test上,给提示导入Junit包,导入后,就没问题了。右键运行也是OK的。
下面我们就来写测试类,先来测试queue点对点的消息发送方式:
代码语言:javascript复制package com.xiaoyexinxin.activeMQTest;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;
import junit.framework.TestCase;
/**
*
* @author liuxin
* @date 2018年4月10日
*/
public class TestActiveMq extends TestCase {
@Test
public void testQueueProducer() throws JMSException{
//创建一个链接工厂connectionFactory对象,需要指定mq服务ip和端口,注意brokerURL的开头是
//tcp://而不是我们通常的http://,端口是61616而不是我们访问activemq后台管理页面所使用的8161
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.72.133:61616");
//使用connectionFactory 链接一个connection对象
Connection connection=connectionFactory.createConnection();
//开启链接,调用connection 对象的start方法
connection.start();
//使用connection 创建一个session对象
//第一个参数是是否开启事务,一般不使用分布式事务,因为它特别消耗性能,而且顾客体验特别差,现在互联网的
//做法是保证数据的最终一致(也就是允许暂时数据不一致),比如顾客下单购买东西,一旦订单生成完就立刻响应给用户
//下单成功。至于下单后一系列的操作,比如通知会计记账、通知物流发货、商品数量同步等等都先不用管,只需要
//发送一条消息到消息队列,消息队列来告知各模块进行相应的操作,一次告知不行就两次,直到完成所有相关操作为止,这
//也就做到了数据的最终一致性。如果第一个参数为true,那么第二个参数将会被忽略掉。如果第一个参数为false,那么
//第二个参数为消息的应答模式,常见的有手动和自动两种模式,我们一般使用自动模式。
Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//使用session对象创建一个Destination(目标)对象,两站形式,queue,topic两种,这里我们使用queue
//参数就是消息队列的名称
Queue queue=session.createQueue("test-queue");
//创建生成者,
MessageProducer producer=session.createProducer(queue);
//创建消息内容
//有两种方式,第一种方式:
// TextMessage textMessage = new ActiveMQTextMessage();
// textMessage.setText("hello,activemq!!!");
//第二种方式:
TextMessage textMessage = session.createTextMessage("hello,activemq!!");
//发送消息
producer.send(textMessage);
//关闭资源,由内而外的关闭
producer.close();
session.close();
connection.close();
}
}
右键运行程序,成功后,在ActiveMQ的后台管理系统,点击”Queues”,可以看到我们刚才发送的那条消息”test-queue”。我们点击”test-queue”
点击test-queue,如下:Persistence为永久保存,priority优先级是4 ,Redelivered是否重复投递消息,这里是否,
接着点击,长串的id
保存了,打开日志看看是啥错;
xiaoye@ubuntu3:~/activemq cd data xiaoye@ubuntu3:~/activemq/data ls activemq.log activemq.pid audit.log kahadb
xiaoye@ubuntu3:~/activemq/data$ tail -200 activemq.log
看到日志有下图的错:
这个是activemq不支持jdk1.8造成的,这里我把虚拟机的jdk换成1.7的试试
官网下载后上传,我这里用的是谷歌浏览器下载,谷歌浏览器下载的jdk不知道为何,应该是tar.gz结尾的jdk,却只有gz.于是百度一圈,直接修改压缩包后缀,改为.tar结尾解压
xiaoye@ubuntu3:~/activemq/data cd ~ xiaoye@ubuntu3:~ cd Downloads/ xiaoye@ubuntu3:~/Downloads
xiaoye@ubuntu3:~/Downloads$ ls apache-activemq-5.11.2-bin.tar.gz apache-activemq-5.15.3-bin.tar.gz hbase-1.0.0-cdh5.5.1.tar.gz jdk-7u80-linux-x64.tar.gz
apache-activemq-5.12.0-bin.tar.gz hadoop-2.5.0-cdh5.2.0.tar.gz hive-0.13.1-cdh5.2.0.tar.gz sqoop-1.4.6-cdh5.5.4.tar.gz
解压到当前目录。
xiaoye@ubuntu3:~ cd Downloads/ xiaoye@ubuntu3:~/Downloads ls apache-activemq-5.11.2-bin.tar.gz hive-0.13.1-cdh5.2.0.tar.gz apache-activemq-5.12.0-bin.tar.gz jdk1.7.0_79 apache-activemq-5.15.3-bin.tar.gz jdk-7u79-linux-x64.tar.gz hadoop-2.5.0-cdh5.2.0.tar.gz sqoop-1.4.6-cdh5.5.4.tar.gz
hbase-1.0.0-cdh5.5.1.tar.gz
下面设置环境变量。
切换到root用户。
root@ubuntu3:~# vim /etc/profile
export JAVA_HOME=/home/xiaoye/Downloads/jdk1.7.0_79
修改保存后,soruce /etc/profile
这个改完后,在修改当前用户下的环境变量。
xiaoye@ubuntu3:~$ vim .bashrc
export JAVA_HOME=/home/xiaoye/Downloads/jdk1.7.0_79 export CLASSPATH=${JAVA_HOME}/lib
export PATH={JAVA_HOME}/bin:PATH
修改保存后,source .bashrc
启动activemq
xiaoye@ubuntu3:~$ ./activemq/bin/activemq start INFO: Loading ‘/home/xiaoye/activemq/bin/env’ INFO: Using java ‘/home/xiaoye/Downloads/jdk1.7.0_79/bin/java’
INFO: Process with pid ‘1454’ is already running
显示还在运行,那就kill -9 1454
再次重启,就好了。
这样在点击ID的时候就不会报错了。
四,消费者
下面我们写消费者方法,写在同生产者一个类里面;
内容如下:
代码语言:javascript复制/*
* 消费者
*/
@Test
public void testQueueConsumer() throws Exception{
//跟创建生产者一样,先连接mq
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.72.133:61616");
//连接一个connection 对象
Connection connection=connectionFactory.createConnection();
//开启链接
connection.start();
//创建一个session会话对象
Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//使用session对象创建一个Destination对象,两种形式queue,topic ,这里我们使用queue
//参数是消息队列的名称
Queue queue=session.createQueue("test-queue");
//使用session创建一个consumer对象
MessageConsumer consumer=session.createConsumer(queue);
//向Consumer对象中设置一个Messagelistener对象,用来接受消息
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message arg0) {
// TODO Auto-generated method stub
if(arg0 instanceof TextMessage){
TextMessage textMessage=(TextMessage) arg0;
String text;
try {
text = textMessage.getText();
System.out.println(text);
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
});
//程序等待接收用户结束操作
//程序自己并不知道什么时候有消息,也不知道什么时候不再发送消息了,这就需要手动干预,
//当我们想停止接收消息时,可以在控制台输入任意键,然后回车即可结束接收操作(也可以直接按回车)。
System.in.read();
//关闭资源
consumer.close();
session.close();
connection.close();
}
右键方法运行消费者方法,会在控制台看到生产者发送的hello world消息,如下:
这里执行消费者方法后并没有停止运行,还在等待新新的消息进来,那么我们右键生产者方法再次运行会发现有两个hello输出。
我们修改一下生产者内容,再次运行。
发现有一个语句输出,说明没有问题。
我们到activeMQ后台管理页面看看
说一下这几个标签的含义,number of pending messages 待发送消息数
Number Of Consumers 消费者消息数
Messages Enqueued 压入队列的消息数量
Messages Dequeued 出队列的消息数量,也就是被消费的消息数
五,topic 发布/订阅模式
生产者代码:
代码语言:javascript复制 /*
* 订阅模式的生产者
*/
@Test
public void testTopicProducer() throws JMSException{
//创建工程连接mq
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.72.133:61616");
//使用工程创建一个连接对象
Connection connection=connectionFactory.createConnection();
//开启链接
connection.start();
//使用链接创建一个会话
Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//使用会话创建一个目标对象
Topic topic=session.createTopic("test-topic");
//创建一个生产者
MessageProducer producer=session.createProducer(topic);
//7.创建一个TextMessage对象 ,并写入要传输的消息内容
//有两种方式,第一种方式:
// TextMessage textMessage = new ActiveMQTextMessage();
// textMessage.setText("hello,activemq!!!");
//第二种方式:
TextMessage textMessage=session.createTextMessage("hello topic!");
//发送消息
producer.send(textMessage);
//关闭资源
producer.close();
session.close();
connection.close();
}
运行上面的测试方法,运行成功后,我们访问activemq的管理后台页面,点击”Topics”,可以看到有”test-topic”这一行,压入消息队列一条消息,但由于没有消费者,因此没有消费掉该消息。
点开test-topic发现:消息体里并没有我们发送的内容。
而queue就不同,queue有持久化一栏,发送的消息会被保存下来。这样的话,就会有个问题,那就是如果发送topic消息时没有消费者,那么这条消息便不存在了,不会再被消费了。因此我们要想消息不会被遗失掉,我们要先打开消费者,然后再发送topic消息。
我们来写消费topic消息的方法,如下图所示,该方法与我们上面学习的消费队列消息的方法不同的是创建Destination的时候不一样,同时为了模拟多个消费者,在该方法中添加一条输出信息,标明该方法是第几个消费者。
消费者代码:
代码语言:javascript复制/*
* 订阅模式的消费者
*/
@Test
public void testTopicConsumer() throws JMSException{
//创建工厂,链接mq
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.72.133:61616");
//使用工程创建一个链接
Connection connection=connectionFactory.createConnection();
//打开链接
connection.start();
//使用链接创建一个会话
Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建消息队列
Topic topic=session.createTopic("test-topic");
//消费者
MessageConsumer consumer=session.createConsumer(topic);
//消费者设置监听器,监听传来的消息
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message arg0) {
// TODO Auto-generated method stub
if(arg0 instanceof TextMessage){
TextMessage textMessage=(TextMessage) arg0;
try {
String text=textMessage.getText();
System.out.println(text);
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
});
//程序等待接收用户结束操作
//程序自己并不知道什么时候有消息,也不知道什么时候不再发送消息了,这就需要手动干预,
//当我们想停止接收消息时,可以在控制台输入任意键,然后回车即可结束接收操作(也可以直接按回车)。
System.out.println("topic消费者1111。。。。。");
try {
System.in.read();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
//9.关闭资源
consumer.close();
session.close();
connection.close();
}
右键运行消费者方法:
修改为2222.。。。。。
再次运行消费者方法。
修改为33333.。。。。
再次运行消费者方法
发现这里消费者的数量是3 了
启动了三个消费者后,我们再发送一次topic消息,发完之后,我们看各个控制台的信息。如下图所示。可以看到都打印出了我们发送的topic信息。
三个进程控制台都有打印生产者消息。
六,topic消息持久化
topic消息没有持久化,也就意味着,如果消息发送者发送消息的时候,如果消费者没有运行的话,它将无法消费这个消息了(即使它启动也无法再接收到那条topic消息了),这样问题就来了,如果那条消息非常重要呢?我们不能容忍接收不到消息的情况。
生产者代码:
代码语言:javascript复制/*
* 订阅发布式 可持久化生产者
*/
@Test
public void TestTopicPersistenceProducer() throws JMSException{
//创建工程连接mq
ActiveMQConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.72.133:61616");
//设置异步发送消息可显著提高发送性能
connectionFactory.setUseAsyncSend(true);
//使用工程创建一个连接对象
Connection connection=connectionFactory.createConnection();
//对每个生产者来说其clientID值必须唯一
connection.setClientID("producerTopic2");
//开启链接
connection.start();
//使用链接创建一个会话
Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//使用会话创建一个目标对象
Topic topic=session.createTopic("test-topic");
//创建一个生产者
MessageProducer producer=session.createProducer(topic);
//DelieveryMode设置为PERSISTENCE(持久化)
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
//创建一个TextMessage对象 ,并写入要传输的消息内容
//有两种方式,第一种方式:
// TextMessage textMessage = new ActiveMQTextMessage();
// textMessage.setText("hello,activemq!!!");
//第二种方式:
TextMessage textMessage=session.createTextMessage("hello topic!persistence2");
//发送消息
producer.send(textMessage);
//关闭资源
producer.close();
session.close();
connection.close();
}
消费者代码:
代码语言:javascript复制/*
* 订阅发布式 可持久化消费者
*/
@Test
public void TestTopicPersistenceConsumer() throws JMSException{
//创建工厂,链接mq
ActiveMQConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.72.133:61616");
//设置异步接受消息,可提高接受性能
connectionFactory.setUseAsyncSend(true);
//使用工程创建一个链接
Connection connection=connectionFactory.createConnection();
//设置每个消费者id,每个都要不同
connection.setClientID("consumer1");
//打开链接
connection.start();
//使用链接创建一个会话
Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建消息队列
Topic topic=session.createTopic("test-topic");
//消费者
MessageConsumer consumer=session.createDurableSubscriber(topic, "consumer1");
//消费者设置监听器,监听传来的消息
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message arg0) {
// TODO Auto-generated method stub
if(arg0 instanceof TextMessage){
TextMessage textMessage=(TextMessage) arg0;
try {
String text=textMessage.getText();
System.out.println(text);
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
});
//程序等待接收用户结束操作
//程序自己并不知道什么时候有消息,也不知道什么时候不再发送消息了,这就需要手动干预,
//当我们想停止接收消息时,可以在控制台输入任意键,然后回车即可结束接收操作(也可以直接按回车)。
System.out.println("topic消费者3333。。。。。");
try {
System.in.read();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
//9.关闭资源
consumer.close();
session.close();
connection.close();
}
我们还需要配置下activemq的activemq.xml文件,只需要添加一句配置,就是在<broker的末尾添加一句关于持久化的配置persistent=”true”即可。如下:
代码语言:javascript复制<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" persistent="true">
然后重新启动mq;
这样设置持久化了就无所谓哪个先启动了。
发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/106159.html原文链接:https://javaforall.cn