在项目中导入依赖坐标
代码语言:javascript复制 <!--使用消息队列,导入依赖坐标-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.15.2</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
使用队列queue的情况
producer端
代码语言:javascript复制 public static void main(String[] args) {
ConnectionFactory connect = new ActiveMQConnectionFactory("tcp://192.168.0.100:61616");
try {
//创建连接对象
Connection connection = connect.createConnection();
connection.start();
//第一个值表示是否使用事务,如果选择true,第二个值相当于选择0
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
//使用队列queue
Queue testqueue = session.createQueue("boss drink");
//创建提供者
MessageProducer producer = session.createProducer(testqueue);
TextMessage textMessage=new ActiveMQTextMessage();
textMessage.setText("我渴了,我要喝水!");
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
producer.send(textMessage);
session.commit();// 事务型消息,必须提交后才生效
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
consumer端1
代码语言:javascript复制 public static void main(String[] args) {
// 连接
ConnectionFactory connect = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://192.168.0.100:61616");
try {
Connection connection = connect.createConnection();
connection.start();
//第一个值表示是否使用事务,如果选择true,第二个值相当于选择0
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Destination testqueue = session.createQueue("boss drink");
MessageConsumer consumer = session.createConsumer(testqueue);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if(message instanceof TextMessage){
try {
String text = ((TextMessage) message).getText();
System.out.println(text ",员工1马上拿起杯子打水。。。");
//session.rollback();
session.commit();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
});
}catch (Exception e){
e.printStackTrace();;
}
}
consumer端2
代码语言:javascript复制 public static void main(String[] args) {
// 连接
ConnectionFactory connect = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://192.168.0.100:61616");
try {
Connection connection = connect.createConnection();
connection.start();
//第一个值表示是否使用事务,如果选择true,第二个值相当于选择0
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Destination testqueue = session.createQueue("boss drink");
MessageConsumer consumer = session.createConsumer(testqueue);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if(message instanceof TextMessage){
try {
String text = ((TextMessage) message).getText();
System.out.println(text ",员工2马上拿起杯子打水。。。");
//session.rollback();
session.commit();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
});
}catch (Exception e){
e.printStackTrace();;
}
}
使用topic话题的情况:
producer端:
代码语言:javascript复制 public static void main(String[] args) {
ConnectionFactory connect = new ActiveMQConnectionFactory("tcp://192.168.0.100:61616");
try {
//创建连接对象
Connection connection = connect.createConnection();
connection.start();
//第一个值表示是否使用事务,如果选择true,第二个值相当于选择0
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
//使用topic话题(订阅)
Topic topic = session.createTopic("boss speak");
//创建提供者
MessageProducer producer = session.createProducer(topic);
TextMessage textMessage=new ActiveMQTextMessage();
textMessage.setText("我们要为中国的伟大复兴而努力奋斗!");
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
producer.send(textMessage);
session.commit();// 事务型消息,必须提交后才生效
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
consumer端1:
代码语言:javascript复制 public static void main(String[] args) {
// 连接
ConnectionFactory connect = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://192.168.0.100:61616");
try {
Connection connection = connect.createConnection();
//设置客户端id
connection.setClientID("userOne");
connection.start();
//第一个值表示是否使用事务,如果选择true,第二个值相当于选择0
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("boss speak");
//在消费端标记,Session.createDurableSubscriber(Topic topic, String name)
//是发布-订阅持久化的接收端的设置。
//参数 topic -> 与发送端的topic 对应,建立连接
//参数 name -> 为订阅者的标识(相当于id)
MessageConsumer consumer =session.createDurableSubscriber(topic,"userOne");
//MessageConsumer consumer = session.createConsumer(topic);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if(message instanceof TextMessage){
try {
String text = ((TextMessage) message).getText();
System.out.println(text ",员工1这个月工资不要了。。。");
//session.rollback();
session.commit();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
});
}catch (Exception e){
e.printStackTrace();;
}
}
consumer端2:
代码语言:javascript复制 public static void main(String[] args) {
// 连接
ConnectionFactory connect = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://192.168.0.100:61616");
try {
Connection connection = connect.createConnection();
connection.start();
//第一个值表示是否使用事务,如果选择true,第二个值相当于选择0
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
//Destination :目标
Destination topic = session.createTopic("boss speak");
MessageConsumer consumer = session.createConsumer(topic);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if(message instanceof TextMessage){
try {
String text = ((TextMessage) message).getText();
System.out.println(text ",员工2周末主动来加班。。。");
//session.rollback();
session.commit();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
});
}catch (Exception e){
e.printStackTrace();;
}
}
关于事务控制
producer提交时的事务 | 事务开启 | 只执行send并不会提交到队列中,只有当执行session.commit()时,消息才被真正的提交到队列中。 |
---|---|---|
事务不开启 | 只要执行send,就进入到队列中。 | |
consumer 接收时的事务 | 事务开启,签收必须写Session.SESSION_TRANSACTED | 收到消息后,消息并没有真正的被消费。消息只是被锁住。一旦出现该线程死掉、抛异常,或者程序执行了session.rollback()那么消息会释放,重新回到队列中被别的消费端再次消费。 |
事务不开启,签收方式选择Session.AUTO_ACKNOWLEDGE | 只要调用comsumer.receive方法 ,自动确认。 | |
事务不开启,签收方式选择Session.CLIENT_ACKNOWLEDGE | 需要客户端执行 message.acknowledge(),否则视为未提交状态,线程结束后,其他线程还可以接收到。 这种方式跟事务模式很像,区别是不能手动回滚,而且可以单独确认某个消息。 | |
事务不开启,签收方式选择Session.DUPS_OK_ACKNOWLEDGE | 在Topic模式下做批量签收时用的,可以提高性能。但是某些情况消息可能会被重复提交,使用这种模式的consumer要可以处理重复提交的问题。 |
持久化与非持久化
通过producer.setDeliveryMode(DeliveryMode.PERSISTENT) 进行设置
持久化的好处就是当activemq宕机的话,消息队列中的消息不会丢失。非持久化会丢失。但是会消耗一定的性能。
与s pring boot整合
在application.properties文件中加入spring.activemq.broker-url****=tcp://mq.server.com:61616
配置类ActiveMQConfig:项目启动的时候加载并执行里面所有的方法
代码语言:javascript复制@Configuration
public class ActiveMQConfig {
@Value("${spring.activemq.broker-url:disabled}")
String brokerURL ;
@Value("${activemq.listener.enable:disabled}")
String listenerEnable;
@Bean
public ActiveMQUtil getActiveMQUtil() throws JMSException {
if(brokerURL.equals("disabled")){
return null;
}
ActiveMQUtil activeMQUtil=new ActiveMQUtil();
activeMQUtil.init(brokerURL);
return activeMQUtil;
}
//定义一个消息监听器连接工厂,这里定义的是点对点模式的监听器连接工厂
@Bean(name = "jmsQueueListener")
public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory ) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
if(!listenerEnable.equals("true")){
return null;
}
factory.setConnectionFactory(activeMQConnectionFactory);
//设置并发数
factory.setConcurrency("5");
//重连间隔时间
factory.setRecoveryInterval(5000L);
factory.setSessionTransacted(false);
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
return factory;
}
@Bean
public ActiveMQConnectionFactory activeMQConnectionFactory ( ){
/* if((url==null||url.equals(""))&&!brokerURL.equals("disabled")){
url=brokerURL;
}*/
ActiveMQConnectionFactory activeMQConnectionFactory =
new ActiveMQConnectionFactory( brokerURL);
return activeMQConnectionFactory;
}
}
工具类ActiveMQUtil
代码语言:javascript复制public class ActiveMQUtil {
PooledConnectionFactory pooledConnectionFactory=null;
public ConnectionFactory init(String brokerUrl) {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
//加入连接池
pooledConnectionFactory=new PooledConnectionFactory(factory);
//出现异常时重新连接
pooledConnectionFactory.setReconnectOnException(true);
//
pooledConnectionFactory.setMaxConnections(5);
pooledConnectionFactory.setExpiryTimeout(10000);
return pooledConnectionFactory;
}
public ConnectionFactory getConnectionFactory(){
return pooledConnectionFactory;
}
}
案例:
controller:
代码语言:javascript复制@RequestMapping("/alipay/callback/return")
public String callBackReturn(HttpServletRequest request,Map<String,String> paramsMap){// 页面同步反转的回调
String out_trade_no = request.getParameter("out_trade_no");
String trade_no = request.getParameter("trade_no");
String sign = request.getParameter("sign");
try {
boolean b = AlipaySignature.rsaCheckV1(paramsMap,AlipayConfig.alipay_public_key,AlipayConfig.charset,AlipayConfig.sign_type);// 对支付宝回调签名的校验
} catch (AlipayApiException e) {
e.printStackTrace();
}
// 修改支付信息
PaymentInfo paymentInfo = new PaymentInfo();
paymentInfo.setPaymentStatus("已支付");
paymentInfo.setCallbackContent(request.getQueryString());
paymentInfo.setOutTradeNo(out_trade_no);
paymentInfo.setAlipayTradeNo(trade_no);
paymentInfo.setCallbackTime(new Date());
//这里使用Queue队列
// 发送系统消息,出发并发商品支付业务服务O2O消息队列
paymentService.sendPaymentSuccess(paymentInfo.getOutTradeNo(),paymentInfo.getPaymentStatus(),trade_no);
paymentService.updatePayment(paymentInfo);
return "finish";
}
servcieimpl:
代码语言:javascript复制@Override
public void sendPaymentSuccess(String outTradeNo, String paymentStatus,String trackingNo) {
try {
// 连接消息服务器
ConnectionFactory connect = activeMQUtil.getConnectionFactory();
Connection connection = connect.createConnection();
connection.start();
//第一个值表示是否使用事务,如果选择true,第二个值相当于选择0
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
// 发送消息
Queue testqueue = session.createQueue("PAYMENT_SUCCESS_QUEUE");
MessageProducer producer = session.createProducer(testqueue);
MapMessage mapMessage=new ActiveMQMapMessage();
mapMessage.setString("out_trade_no",outTradeNo);
mapMessage.setString("payment_status",paymentStatus);
mapMessage.setString("tracking_no",trackingNo);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
producer.send(mapMessage);
session.commit();// 事务型消息,必须提交后才生效
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}